From 6182b622dd827b1bdbffbe97b522bd2bbc79d581 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 31 Jul 2020 10:13:03 +0300 Subject: [PATCH 1/2] Introduced Edge Lifecycle. Update Edge session configuration --- .../server/actors/ActorSystemContext.java | 16 ++++---- .../server/actors/tenant/TenantActor.java | 17 +++++++- .../server/controller/EdgeController.java | 9 ++++ .../service/edge/rpc/EdgeGrpcService.java | 20 ++++++++- .../service/edge/rpc/EdgeGrpcSession.java | 14 +++++++ .../service/edge/rpc/EdgeRpcService.java | 41 +++++++++++++++++++ 6 files changed, 107 insertions(+), 10 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 78caa4a626..3f4dcb3c0b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -66,6 +66,7 @@ import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.service.component.ComponentDiscoveryService; +import org.thingsboard.server.service.edge.rpc.EdgeRpcService; import org.thingsboard.server.service.encoding.DataDecodingEncodingService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.executors.ExternalCallExecutorService; @@ -254,15 +255,14 @@ public class ActorSystemContext { @Getter private TbCoreDeviceRpcService tbCoreDeviceRpcService; - @Lazy - @Autowired - @Getter - private EdgeService edgeService; + @Autowired(required = false) + @Getter private EdgeService edgeService; - @Lazy - @Autowired - @Getter - private EdgeEventService edgeEventService; + @Autowired(required = false) + @Getter private EdgeEventService edgeEventService; + + @Autowired(required = false) + @Getter private EdgeRpcService edgeRpcService; @Value("${actors.session.max_concurrent_sessions_per_device:1}") @Getter diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 7ce5fb3206..fde6f688c5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -31,10 +31,13 @@ import org.thingsboard.server.actors.service.ContextBasedCreator; import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.common.msg.MsgType; @@ -47,6 +50,7 @@ import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.service.edge.rpc.EdgeRpcService; import java.util.List; import java.util.Optional; @@ -202,7 +206,18 @@ public class TenantActor extends RuleChainManagerActor { } private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) { - if (isRuleEngineForCurrentTenant) { + if (msg.getEntityId().getEntityType() == EntityType.EDGE) { + EdgeId edgeId = new EdgeId(msg.getEntityId().getId()); + EdgeRpcService edgeRpcService = systemContext.getEdgeRpcService(); + if (msg.getEvent() == ComponentLifecycleEvent.DELETED) { + edgeRpcService.deleteEdge(edgeId); + } else { + Edge edge = systemContext.getEdgeService().findEdgeById(tenantId, edgeId); + if (msg.getEvent() == ComponentLifecycleEvent.UPDATED) { + edgeRpcService.updateEdge(edge); + } + } + } else if (isRuleEngineForCurrentTenant) { TbActorRef target = getEntityActorRef(msg.getEntityId()); if (target != null) { if (msg.getEntityId().getEntityType() == EntityType.RULE_CHAIN) { diff --git a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java index b6f2c31561..3c17176b88 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java @@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.TextPageData; import org.thingsboard.server.common.data.page.TextPageLink; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.exception.IncorrectParameterException; @@ -101,6 +102,9 @@ public class EdgeController extends BaseController { edgeService.assignDefaultRuleChainsToEdge(tenantId, savedEdge.getId()); } + tbClusterService.onEntityStateChange(savedEdge.getTenantId(), savedEdge.getId(), + created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); + logEntityAction(savedEdge.getId(), savedEdge, null, created ? ActionType.ADDED : ActionType.UPDATED, null); return savedEdge; } catch (Exception e) { @@ -120,6 +124,9 @@ public class EdgeController extends BaseController { Edge edge = checkEdgeId(edgeId, Operation.DELETE); edgeService.deleteEdge(getTenantId(), edgeId); + tbClusterService.onEntityStateChange(getTenantId(), edgeId, + ComponentLifecycleEvent.DELETED); + logEntityAction(edgeId, edge, null, ActionType.DELETED, null, strEdgeId); @@ -284,6 +291,8 @@ public class EdgeController extends BaseController { Edge updatedEdge = edgeNotificationService.setEdgeRootRuleChain(getTenantId(), edge, ruleChainId); + tbClusterService.onEntityStateChange(updatedEdge.getTenantId(), updatedEdge.getId(), ComponentLifecycleEvent.UPDATED); + logEntityAction(updatedEdge.getId(), updatedEdge, null, ActionType.UPDATED, null); return updatedEdge; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 3792fbdf73..8e2990c75f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -27,6 +27,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; @@ -52,7 +53,7 @@ import java.util.concurrent.Executors; @Service @Slf4j @ConditionalOnProperty(prefix = "edges.rpc", value = "enabled", havingValue = "true") -public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase { +public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService { private final Map sessions = new ConcurrentHashMap<>(); private static final ObjectMapper mapper = new ObjectMapper(); @@ -117,6 +118,23 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase { return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper).getInputStream(); } + @Override + public void updateEdge(Edge edge) { + EdgeGrpcSession session = sessions.get(edge.getId()); + if (session != null && session.isConnected()) { + session.onConfigurationUpdate(edge); + } + } + + @Override + public void deleteEdge(EdgeId edgeId) { + EdgeGrpcSession session = sessions.get(edgeId); + if (session != null && session.isConnected()) { + session.close(); + sessions.remove(edgeId); + } + } + private void onEdgeConnect(EdgeId edgeId, EdgeGrpcSession edgeGrpcSession) { sessions.put(edgeId, edgeGrpcSession); save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, true); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 9a5632bde4..72014cf1c9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -192,6 +192,20 @@ public final class EdgeGrpcSession implements Closeable { }; } + void onConfigurationUpdate(Edge edge) { + try { + this.edge = edge; + // TODO: voba - push edge configuration update to edge +// outputStream.onNext(org.thingsboard.server.gen.integration.ResponseMsg.newBuilder() +// .setIntegrationUpdateMsg(IntegrationUpdateMsg.newBuilder() +// .setConfiguration(constructIntegrationConfigProto(configuration, defaultConverterProto, downLinkConverterProto)) +// .build()) +// .build()); + } catch (Exception e) { + log.error("Failed to construct proto objects!", e); + } + } + void processHandleMessages() throws ExecutionException, InterruptedException { Long queueStartTs = getQueueStartTs().get(); TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), queueStartTs, null, true); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java new file mode 100644 index 0000000000..27cd81250b --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java @@ -0,0 +1,41 @@ +/** + * ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL + * + * Copyright © 2016-2020 ThingsBoard, Inc. All Rights Reserved. + * + * NOTICE: All information contained herein is, and remains + * the property of ThingsBoard, Inc. and its suppliers, + * if any. The intellectual and technical concepts contained + * herein are proprietary to ThingsBoard, Inc. + * and its suppliers and may be covered by U.S. and Foreign Patents, + * patents in process, and are protected by trade secret or copyright law. + * + * Dissemination of this information or reproduction of this material is strictly forbidden + * unless prior written permission is obtained from COMPANY. + * + * Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees, + * managers or contractors who have executed Confidentiality and Non-disclosure agreements + * explicitly covering such access. + * + * The copyright notice above does not evidence any actual or intended publication + * or disclosure of this source code, which includes + * information that is confidential and/or proprietary, and is a trade secret, of COMPANY. + * ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, + * OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT + * THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED, + * AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES. + * THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION + * DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, + * OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART. + */ +package org.thingsboard.server.service.edge.rpc; + +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.id.EdgeId; + +public interface EdgeRpcService { + + void updateEdge(Edge edge); + + void deleteEdge(EdgeId edgeId); +} From ce746b4f91b0bff088e18869825b30012846a1bf Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 31 Jul 2020 11:13:07 +0300 Subject: [PATCH 2/2] Fixed license header --- .../service/edge/rpc/EdgeRpcService.java | 35 ++++++------------- 1 file changed, 10 insertions(+), 25 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java index 27cd81250b..2355a2e49a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java @@ -1,32 +1,17 @@ /** - * ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL + * Copyright © 2016-2020 The Thingsboard Authors * - * Copyright © 2016-2020 ThingsBoard, Inc. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * NOTICE: All information contained herein is, and remains - * the property of ThingsBoard, Inc. and its suppliers, - * if any. The intellectual and technical concepts contained - * herein are proprietary to ThingsBoard, Inc. - * and its suppliers and may be covered by U.S. and Foreign Patents, - * patents in process, and are protected by trade secret or copyright law. + * http://www.apache.org/licenses/LICENSE-2.0 * - * Dissemination of this information or reproduction of this material is strictly forbidden - * unless prior written permission is obtained from COMPANY. - * - * Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees, - * managers or contractors who have executed Confidentiality and Non-disclosure agreements - * explicitly covering such access. - * - * The copyright notice above does not evidence any actual or intended publication - * or disclosure of this source code, which includes - * information that is confidential and/or proprietary, and is a trade secret, of COMPANY. - * ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, - * OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT - * THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED, - * AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES. - * THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION - * DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, - * OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.thingsboard.server.service.edge.rpc;