diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 78caa4a626..3f4dcb3c0b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -66,6 +66,7 @@ import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.service.component.ComponentDiscoveryService; +import org.thingsboard.server.service.edge.rpc.EdgeRpcService; import org.thingsboard.server.service.encoding.DataDecodingEncodingService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.executors.ExternalCallExecutorService; @@ -254,15 +255,14 @@ public class ActorSystemContext { @Getter private TbCoreDeviceRpcService tbCoreDeviceRpcService; - @Lazy - @Autowired - @Getter - private EdgeService edgeService; + @Autowired(required = false) + @Getter private EdgeService edgeService; - @Lazy - @Autowired - @Getter - private EdgeEventService edgeEventService; + @Autowired(required = false) + @Getter private EdgeEventService edgeEventService; + + @Autowired(required = false) + @Getter private EdgeRpcService edgeRpcService; @Value("${actors.session.max_concurrent_sessions_per_device:1}") @Getter diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 7ce5fb3206..fde6f688c5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -31,10 +31,13 @@ import org.thingsboard.server.actors.service.ContextBasedCreator; import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.common.msg.MsgType; @@ -47,6 +50,7 @@ import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.service.edge.rpc.EdgeRpcService; import java.util.List; import java.util.Optional; @@ -202,7 +206,18 @@ public class TenantActor extends RuleChainManagerActor { } private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) { - if (isRuleEngineForCurrentTenant) { + if (msg.getEntityId().getEntityType() == EntityType.EDGE) { + EdgeId edgeId = new EdgeId(msg.getEntityId().getId()); + EdgeRpcService edgeRpcService = systemContext.getEdgeRpcService(); + if (msg.getEvent() == ComponentLifecycleEvent.DELETED) { + edgeRpcService.deleteEdge(edgeId); + } else { + Edge edge = systemContext.getEdgeService().findEdgeById(tenantId, edgeId); + if (msg.getEvent() == ComponentLifecycleEvent.UPDATED) { + edgeRpcService.updateEdge(edge); + } + } + } else if (isRuleEngineForCurrentTenant) { TbActorRef target = getEntityActorRef(msg.getEntityId()); if (target != null) { if (msg.getEntityId().getEntityType() == EntityType.RULE_CHAIN) { diff --git a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java index b6f2c31561..3c17176b88 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java @@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.TextPageData; import org.thingsboard.server.common.data.page.TextPageLink; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.exception.IncorrectParameterException; @@ -101,6 +102,9 @@ public class EdgeController extends BaseController { edgeService.assignDefaultRuleChainsToEdge(tenantId, savedEdge.getId()); } + tbClusterService.onEntityStateChange(savedEdge.getTenantId(), savedEdge.getId(), + created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); + logEntityAction(savedEdge.getId(), savedEdge, null, created ? ActionType.ADDED : ActionType.UPDATED, null); return savedEdge; } catch (Exception e) { @@ -120,6 +124,9 @@ public class EdgeController extends BaseController { Edge edge = checkEdgeId(edgeId, Operation.DELETE); edgeService.deleteEdge(getTenantId(), edgeId); + tbClusterService.onEntityStateChange(getTenantId(), edgeId, + ComponentLifecycleEvent.DELETED); + logEntityAction(edgeId, edge, null, ActionType.DELETED, null, strEdgeId); @@ -284,6 +291,8 @@ public class EdgeController extends BaseController { Edge updatedEdge = edgeNotificationService.setEdgeRootRuleChain(getTenantId(), edge, ruleChainId); + tbClusterService.onEntityStateChange(updatedEdge.getTenantId(), updatedEdge.getId(), ComponentLifecycleEvent.UPDATED); + logEntityAction(updatedEdge.getId(), updatedEdge, null, ActionType.UPDATED, null); return updatedEdge; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 3792fbdf73..8e2990c75f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -27,6 +27,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; @@ -52,7 +53,7 @@ import java.util.concurrent.Executors; @Service @Slf4j @ConditionalOnProperty(prefix = "edges.rpc", value = "enabled", havingValue = "true") -public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase { +public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService { private final Map sessions = new ConcurrentHashMap<>(); private static final ObjectMapper mapper = new ObjectMapper(); @@ -117,6 +118,23 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase { return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper).getInputStream(); } + @Override + public void updateEdge(Edge edge) { + EdgeGrpcSession session = sessions.get(edge.getId()); + if (session != null && session.isConnected()) { + session.onConfigurationUpdate(edge); + } + } + + @Override + public void deleteEdge(EdgeId edgeId) { + EdgeGrpcSession session = sessions.get(edgeId); + if (session != null && session.isConnected()) { + session.close(); + sessions.remove(edgeId); + } + } + private void onEdgeConnect(EdgeId edgeId, EdgeGrpcSession edgeGrpcSession) { sessions.put(edgeId, edgeGrpcSession); save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, true); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 9a5632bde4..72014cf1c9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -192,6 +192,20 @@ public final class EdgeGrpcSession implements Closeable { }; } + void onConfigurationUpdate(Edge edge) { + try { + this.edge = edge; + // TODO: voba - push edge configuration update to edge +// outputStream.onNext(org.thingsboard.server.gen.integration.ResponseMsg.newBuilder() +// .setIntegrationUpdateMsg(IntegrationUpdateMsg.newBuilder() +// .setConfiguration(constructIntegrationConfigProto(configuration, defaultConverterProto, downLinkConverterProto)) +// .build()) +// .build()); + } catch (Exception e) { + log.error("Failed to construct proto objects!", e); + } + } + void processHandleMessages() throws ExecutionException, InterruptedException { Long queueStartTs = getQueueStartTs().get(); TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), queueStartTs, null, true); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java new file mode 100644 index 0000000000..2355a2e49a --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc; + +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.id.EdgeId; + +public interface EdgeRpcService { + + void updateEdge(Edge edge); + + void deleteEdge(EdgeId edgeId); +}