From f29d15d8b7dab6061d7c306defca36827dd41cee Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 14 Aug 2020 12:25:57 +0300 Subject: [PATCH] Add/delete users to edge on assign/unassign to/from customer --- .../server/controller/BaseController.java | 8 +++ .../server/controller/EdgeController.java | 6 ++ .../edge/DefaultEdgeNotificationService.java | 43 ++++++++++- .../service/edge/EdgeContextComponent.java | 5 ++ .../service/edge/rpc/EdgeGrpcSession.java | 35 +++++++++ .../CustomerUpdateMsgConstructor.java | 72 +++++++++++++++++++ .../constructor/EntityDataMsgConstructor.java | 6 +- .../constructor/UserUpdateMsgConstructor.java | 4 ++ .../edge/rpc/init/DefaultSyncEdgeService.java | 1 + common/edge-api/src/main/proto/edge.proto | 12 ++-- 10 files changed, 184 insertions(+), 8 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/CustomerUpdateMsgConstructor.java diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index c2c6771cf1..390b2187fa 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -718,6 +718,14 @@ public abstract class BaseController { return result; } + protected void sendNotificationMsgToEdgeService(TenantId tenantId, EdgeId edgeId, CustomerId customerId, ActionType edgeEventAction) { + try { + sendNotificationMsgToEdgeService(tenantId, edgeId, null, json.writeValueAsString(customerId), EdgeEventType.EDGE, edgeEventAction); + } catch (Exception e) { + log.warn("Failed to push assign/unassign to/from customer to core: {}", customerId, e); + } + } + protected void sendNotificationMsgToEdgeService(TenantId tenantId, EntityRelation relation, ActionType edgeEventAction) { try { if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) && diff --git a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java index 3c17176b88..19b82b3b85 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java @@ -178,6 +178,9 @@ public class EdgeController extends BaseController { savedEdge.getCustomerId(), ActionType.ASSIGNED_TO_CUSTOMER, null, strEdgeId, strCustomerId, customer.getName()); + sendNotificationMsgToEdgeService(savedEdge.getTenantId(), savedEdge.getId(), + customerId, ActionType.ASSIGNED_TO_CUSTOMER); + return savedEdge; } catch (Exception e) { logEntityAction(emptyId(EntityType.EDGE), null, @@ -206,6 +209,9 @@ public class EdgeController extends BaseController { edge.getCustomerId(), ActionType.UNASSIGNED_FROM_CUSTOMER, null, strEdgeId, customer.getId().toString(), customer.getName()); + sendNotificationMsgToEdgeService(savedEdge.getTenantId(), savedEdge.getId(), + edge.getCustomerId(), ActionType.UNASSIGNED_FROM_CUSTOMER); + return savedEdge; } catch (Exception e) { logEntityAction(emptyId(EntityType.EDGE), null, diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java index b4be8a8e82..279ec96064 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -157,8 +157,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { TenantId tenantId = new TenantId(new UUID(edgeNotificationMsg.getTenantIdMSB(), edgeNotificationMsg.getTenantIdLSB())); EdgeEventType edgeEventType = EdgeEventType.valueOf(edgeNotificationMsg.getEdgeEventType()); switch (edgeEventType) { - // TODO: voba - handle edge updates - // case EDGE: + case EDGE: + processEdge(tenantId, edgeNotificationMsg); + break; case USER: case ASSET: case DEVICE: @@ -186,6 +187,44 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { } } + private void processEdge(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { + // TODO: voba - handle edge updates + try { + ActionType edgeEventActionType = ActionType.valueOf(edgeNotificationMsg.getEdgeEventAction()); + EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB())); + switch (edgeEventActionType) { + case ASSIGNED_TO_CUSTOMER: + case UNASSIGNED_FROM_CUSTOMER: + CustomerId customerId = mapper.readValue(edgeNotificationMsg.getEntityBody(), CustomerId.class); + ListenableFuture edgeFuture = edgeService.findEdgeByIdAsync(tenantId, edgeId); + Futures.addCallback(edgeFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable Edge edge) { + if (edge != null && customerId != null && !EntityId.NULL_UUID.equals(customerId.getId())) { + ActionType actionType = ActionType.ASSIGNED_TO_CUSTOMER.equals(edgeEventActionType) ? ActionType.ADDED : ActionType.DELETED; + saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, actionType, customerId, null); + TextPageData pageData = userService.findCustomerUsers(tenantId, customerId, new TextPageLink(Integer.MAX_VALUE)); + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { + log.trace("[{}] [{}] user(s) are going to be {} to edge.", edge.getId(), pageData.getData().size(), actionType.name()); + for (User user : pageData.getData()) { + saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, actionType, user.getId(), null); + } + } + } + } + + @Override + public void onFailure(Throwable t) { + log.error("Can't find edge by id [{}]", edgeNotificationMsg, t); + } + }, dbCallbackExecutorService); + break; + } + } catch (Exception e) { + log.error("Exception during processing edge event", e); + } + } + private void processEntity(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { ActionType edgeEventActionType = ActionType.valueOf(edgeNotificationMsg.getEdgeEventAction()); EdgeEventType edgeEventType = EdgeEventType.valueOf(edgeNotificationMsg.getEdgeEventType()); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java index fcbc8ff756..2b756cefc4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java @@ -41,6 +41,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.edge.rpc.EdgeEventStorageSettings; import org.thingsboard.server.service.edge.rpc.constructor.AlarmUpdateMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AssetUpdateMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.CustomerUpdateMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.DashboardUpdateMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.DeviceUpdateMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.EntityDataMsgConstructor; @@ -167,6 +168,10 @@ public class EdgeContextComponent { @Autowired private DashboardUpdateMsgConstructor dashboardUpdateMsgConstructor; + @Lazy + @Autowired + private CustomerUpdateMsgConstructor customerUpdateMsgConstructor; + @Lazy @Autowired private UserUpdateMsgConstructor userUpdateMsgConstructor; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 2586d4cb7d..8dcecfb36d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -30,6 +30,7 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.RandomStringUtils; import org.checkerframework.checker.nullness.qual.Nullable; +import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Dashboard; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; @@ -82,6 +83,7 @@ import org.thingsboard.server.gen.edge.AttributesRequestMsg; import org.thingsboard.server.gen.edge.ConnectRequestMsg; import org.thingsboard.server.gen.edge.ConnectResponseCode; import org.thingsboard.server.gen.edge.ConnectResponseMsg; +import org.thingsboard.server.gen.edge.CustomerUpdateMsg; import org.thingsboard.server.gen.edge.DashboardUpdateMsg; import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg; import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg; @@ -348,6 +350,9 @@ public final class EdgeGrpcSession implements Closeable { case DASHBOARD: processDashboard(edgeEvent, msgType, edgeEventAction); break; + case CUSTOMER: + processCustomer(edgeEvent, msgType, edgeEventAction); + break; case RULE_CHAIN: processRuleChain(edgeEvent, msgType, edgeEventAction); break; @@ -510,6 +515,36 @@ public final class EdgeGrpcSession implements Closeable { } } + private void processCustomer(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeEventAction) { + CustomerId customerId = new CustomerId(edgeEvent.getEntityId()); + EntityUpdateMsg entityUpdateMsg = null; + switch (edgeEventAction) { + case ADDED: + case UPDATED: + Customer customer = ctx.getCustomerService().findCustomerById(edgeEvent.getTenantId(), customerId); + if (customer != null) { + CustomerUpdateMsg customerUpdateMsg = + ctx.getCustomerUpdateMsgConstructor().constructCustomerUpdatedMsg(msgType, customer); + entityUpdateMsg = EntityUpdateMsg.newBuilder() + .setCustomerUpdateMsg(customerUpdateMsg) + .build(); + } + break; + case DELETED: + CustomerUpdateMsg customerUpdateMsg = + ctx.getCustomerUpdateMsgConstructor().constructCustomerDeleteMsg(customerId); + entityUpdateMsg = EntityUpdateMsg.newBuilder() + .setCustomerUpdateMsg(customerUpdateMsg) + .build(); + break; + } + if (entityUpdateMsg != null) { + outputStream.onNext(ResponseMsg.newBuilder() + .setEntityUpdateMsg(entityUpdateMsg) + .build()); + } + } + private void processRuleChain(EdgeEvent edgeEvent, UpdateMsgType msgType, ActionType edgeEventAction) { RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId()); EntityUpdateMsg entityUpdateMsg = null; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/CustomerUpdateMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/CustomerUpdateMsgConstructor.java new file mode 100644 index 0000000000..a5f54fb443 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/CustomerUpdateMsgConstructor.java @@ -0,0 +1,72 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.constructor; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.Customer; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.dao.util.mapping.JacksonUtil; +import org.thingsboard.server.gen.edge.CustomerUpdateMsg; +import org.thingsboard.server.gen.edge.UpdateMsgType; + +@Component +@Slf4j +public class CustomerUpdateMsgConstructor { + + public CustomerUpdateMsg constructCustomerUpdatedMsg(UpdateMsgType msgType, Customer customer) { + CustomerUpdateMsg.Builder builder = CustomerUpdateMsg.newBuilder() + .setMsgType(msgType) + .setIdMSB(customer.getId().getId().getMostSignificantBits()) + .setIdLSB(customer.getId().getId().getLeastSignificantBits()) + .setTitle(customer.getTitle()); + if (customer.getCountry() != null) { + builder.setCountry(customer.getCountry()); + } + if (customer.getState() != null) { + builder.setState(customer.getState()); + } + if (customer.getCity() != null) { + builder.setCity(customer.getCity()); + } + if (customer.getAddress() != null) { + builder.setAddress(customer.getAddress()); + } + if (customer.getAddress2() != null) { + builder.setAddress2(customer.getAddress2()); + } + if (customer.getZip() != null) { + builder.setZip(customer.getZip()); + } + if (customer.getPhone() != null) { + builder.setPhone(customer.getPhone()); + } + if (customer.getEmail() != null) { + builder.setEmail(customer.getEmail()); + } + if (customer.getAdditionalInfo() != null) { + builder.setAdditionalInfo(JacksonUtil.toString(customer.getAdditionalInfo())); + } + return builder.build(); + } + + public CustomerUpdateMsg constructCustomerDeleteMsg(CustomerId customerId) { + return CustomerUpdateMsg.newBuilder() + .setMsgType(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE) + .setIdMSB(customerId.getId().getMostSignificantBits()) + .setIdLSB(customerId.getId().getLeastSignificantBits()).build(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/EntityDataMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/EntityDataMsgConstructor.java index ff342faaa0..c1d211af58 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/EntityDataMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/EntityDataMsgConstructor.java @@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc.constructor; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonElement; +import com.google.gson.JsonNull; import com.google.gson.JsonObject; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -42,7 +43,10 @@ public class EntityDataMsgConstructor { case TIMESERIES_UPDATED: try { JsonObject data = entityData.getAsJsonObject(); - long ts = data.getAsJsonPrimitive("ts").getAsLong(); + long ts = System.currentTimeMillis(); + if (data.get("ts") != null && !data.get("ts").isJsonNull()) { + ts = data.getAsJsonObject("ts").getAsLong(); + } builder.setPostTelemetryMsg(JsonConverter.convertToTelemetryProto(data.getAsJsonObject("data"), ts)); } catch (Exception e) { log.warn("Can't convert to telemetry proto, entityData [{}]", entityData, e); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/UserUpdateMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/UserUpdateMsgConstructor.java index af42448e19..b50e0c2b5e 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/UserUpdateMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/UserUpdateMsgConstructor.java @@ -36,6 +36,10 @@ public class UserUpdateMsgConstructor { .setIdLSB(user.getId().getId().getLeastSignificantBits()) .setEmail(user.getEmail()) .setAuthority(user.getAuthority().name()); + if (user.getCustomerId() != null) { + builder.setCustomerIdMSB(user.getCustomerId().getId().getMostSignificantBits()); + builder.setCustomerIdLSB(user.getCustomerId().getId().getLeastSignificantBits()); + } if (user.getFirstName() != null) { builder.setFirstName(user.getFirstName()); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java index 0667da992a..bf20fc2051 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java @@ -264,6 +264,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService { TextPageData pageData = userService.findTenantAdmins(edge.getTenantId(), new TextPageLink(Integer.MAX_VALUE)); pushUsersToEdge(pageData, edge); if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) { + saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, ActionType.ADDED, edge.getCustomerId(), null); pageData = userService.findCustomerUsers(edge.getTenantId(), edge.getCustomerId(), new TextPageLink(Integer.MAX_VALUE)); pushUsersToEdge(pageData, edge); } diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index fb9ff9dbbf..872af2a529 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -261,11 +261,13 @@ message UserUpdateMsg { UpdateMsgType msgType = 1; int64 idMSB = 2; int64 idLSB = 3; - string email = 4; - string authority = 5; - string firstName = 6; - string lastName = 7; - string additionalInfo = 8; + int64 customerIdMSB = 4; + int64 customerIdLSB = 5; + string email = 6; + string authority = 7; + string firstName = 8; + string lastName = 9; + string additionalInfo = 10; } message WidgetsBundleUpdateMsg {