diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 2817b1e359..4583c78d11 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -36,7 +36,6 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.aware.TenantAwareMsg; -import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; import org.thingsboard.server.common.msg.queue.RuleEngineException; @@ -105,7 +104,9 @@ public class AppActor extends ContextAwareActor { onToDeviceActorMsg((TenantAwareMsg) msg, true); break; case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG: - onToTenantActorMsg((EdgeEventUpdateMsg) msg); + case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG: + case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG: + onToTenantActorMsg((TenantAwareMsg) msg); break; case SESSION_TIMEOUT_MSG: ctx.broadcastToChildrenByType(msg, EntityType.TENANT); @@ -193,7 +194,7 @@ public class AppActor extends ContextAwareActor { () -> new TenantActor.ActorCreator(systemContext, tenantId)); } - private void onToTenantActorMsg(EdgeEventUpdateMsg msg) { + private void onToTenantActorMsg(TenantAwareMsg msg) { TbActorRef target = null; if (ModelConstants.SYSTEM_TENANT.equals(msg.getTenantId())) { log.warn("Message has system tenant id: {}", msg); diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index c22b44d7cf..5906b73db8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -48,6 +48,8 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.aware.DeviceAwareMsg; import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; +import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; +import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; @@ -167,7 +169,9 @@ public class TenantActor extends RuleChainManagerActor { onRuleChainMsg((RuleChainAwareMsg) msg); break; case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG: - onToEdgeSessionMsg((EdgeEventUpdateMsg) msg); + case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG: + case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG: + onToEdgeSessionMsg(msg); break; default: return false; @@ -271,9 +275,24 @@ public class TenantActor extends RuleChainManagerActor { () -> new DeviceActorCreator(systemContext, tenantId, deviceId)); } - private void onToEdgeSessionMsg(EdgeEventUpdateMsg msg) { - log.trace("[{}] onToEdgeSessionMsg [{}]", msg.getTenantId(), msg); - systemContext.getEdgeRpcService().onEdgeEvent(tenantId, msg.getEdgeId()); + private void onToEdgeSessionMsg(TbActorMsg msg) { + switch (msg.getMsgType()) { + case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG: + EdgeEventUpdateMsg edgeEventUpdateMsg = (EdgeEventUpdateMsg) msg; + log.trace("[{}] onToEdgeSessionMsg [{}]", edgeEventUpdateMsg.getTenantId(), msg); + systemContext.getEdgeRpcService().onEdgeEvent(tenantId, edgeEventUpdateMsg.getEdgeId()); + break; + case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG: + ToEdgeSyncRequest toEdgeSyncRequest = (ToEdgeSyncRequest) msg; + log.trace("[{}] toEdgeSyncRequest [{}]", toEdgeSyncRequest.getTenantId(), msg); + systemContext.getEdgeRpcService().startSyncProcess(tenantId, toEdgeSyncRequest.getEdgeId(), toEdgeSyncRequest.getId()); + break; + case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG: + FromEdgeSyncResponse fromEdgeSyncResponse = (FromEdgeSyncResponse) msg; + log.trace("[{}] fromEdgeSyncResponse [{}]", fromEdgeSyncResponse.getTenantId(), msg); + systemContext.getEdgeRpcService().processSyncResponse(fromEdgeSyncResponse); + break; + } } private ApiUsageState getApiUsageState() { diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index 1e9958cc25..2e77045771 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -269,10 +269,7 @@ public abstract class BaseController { protected EdgeService edgeService; @Autowired(required = false) - protected EdgeNotificationService edgeNotificationService; - - @Autowired(required = false) - protected EdgeRpcService edgeGrpcService; + protected EdgeRpcService edgeRpcService; @Autowired protected TbNotificationEntityService notificationEntityService; diff --git a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java index 59c9eb39c1..6a5dd44d0b 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java @@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; @@ -32,6 +33,7 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.async.DeferredResult; import org.thingsboard.rule.engine.flow.TbRuleChainInputNode; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.EntitySubtype; @@ -47,6 +49,8 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.rule.RuleChain; +import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; +import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.model.ModelConstants; @@ -61,6 +65,7 @@ import org.thingsboard.server.service.security.permission.Resource; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; import static org.thingsboard.server.controller.ControllerConstants.CUSTOMER_ID_PARAM_DESCRIPTION; @@ -529,24 +534,35 @@ public class EdgeController extends BaseController { "All entities that are assigned to particular edge are going to be send to remote edge service." + TENANT_AUTHORITY_PARAGRAPH) @PreAuthorize("hasAuthority('TENANT_ADMIN')") @RequestMapping(value = "/edge/sync/{edgeId}", method = RequestMethod.POST) - public void syncEdge(@ApiParam(value = EDGE_ID_PARAM_DESCRIPTION, required = true) + public DeferredResult syncEdge(@ApiParam(value = EDGE_ID_PARAM_DESCRIPTION, required = true) @PathVariable("edgeId") String strEdgeId) throws ThingsboardException { checkParameter("edgeId", strEdgeId); try { + final DeferredResult response = new DeferredResult<>(); if (isEdgesEnabled()) { EdgeId edgeId = new EdgeId(toUUID(strEdgeId)); edgeId = checkNotNull(edgeId); SecurityUser user = getCurrentUser(); TenantId tenantId = user.getTenantId(); - edgeGrpcService.startSyncProcess(tenantId, edgeId); + ToEdgeSyncRequest request = new ToEdgeSyncRequest(UUID.randomUUID(), tenantId, edgeId); + edgeRpcService.processSyncRequest(request, fromEdgeSyncResponse -> reply(response, fromEdgeSyncResponse)); } else { throw new ThingsboardException("Edges support disabled", ThingsboardErrorCode.GENERAL); } + return response; } catch (Exception e) { throw handleException(e); } } + private void reply(DeferredResult response, FromEdgeSyncResponse fromEdgeSyncResponse) { + if (fromEdgeSyncResponse.isSuccess()) { + response.setResult(new ResponseEntity<>(HttpStatus.OK)); + } else { + response.setErrorResult(new ThingsboardException("Edge is not connected", ThingsboardErrorCode.GENERAL)); + } + } + @ApiOperation(value = "Find missing rule chains (findMissingToRelatedRuleChains)", notes = "Returns list of rule chains ids that are not assigned to particular edge, but these rule chains are present in the already assigned rule chains to edge." + TENANT_AUTHORITY_PARAGRAPH) @PreAuthorize("hasAuthority('TENANT_ADMIN')") diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index c56fcd4ed9..18d53fea71 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -26,6 +26,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.ResourceUtils; import org.thingsboard.server.common.data.edge.Edge; @@ -34,6 +35,8 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; +import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.gen.edge.v1.EdgeRpcServiceGrpc; import org.thingsboard.server.gen.edge.v1.RequestMsg; import org.thingsboard.server.gen.edge.v1.ResponseMsg; @@ -59,6 +62,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; @Service @Slf4j @@ -71,6 +75,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private final Map sessionNewEvents = new HashMap<>(); private final ConcurrentMap> sessionEdgeEventChecks = new ConcurrentHashMap<>(); + private final ConcurrentMap> localSyncEdgeRequests = new ConcurrentHashMap<>(); + @Value("${edges.rpc.port}") private int rpcPort; @Value("${edges.rpc.ssl.enabled}") @@ -98,12 +104,17 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i @Autowired private TelemetrySubscriptionService tsSubService; + @Autowired + private TbClusterService clusterService; + private Server server; private ScheduledExecutorService edgeEventProcessingExecutorService; private ScheduledExecutorService sendDownlinkExecutorService; + private ScheduledExecutorService syncScheduler; + @PostConstruct public void init() { log.info("Initializing Edge RPC service!"); @@ -131,6 +142,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } this.edgeEventProcessingExecutorService = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-scheduler")); this.sendDownlinkExecutorService = Executors.newScheduledThreadPool(sendSchedulerPoolSize, ThingsBoardThreadFactory.forName("edge-send-scheduler")); + this.syncScheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("edge-sync-scheduler")); log.info("Edge RPC service initialized!"); } @@ -191,16 +203,19 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i @Override public void onEdgeEvent(TenantId tenantId, EdgeId edgeId) { - log.trace("[{}] onEdgeEvent [{}]", tenantId, edgeId.getId()); - final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock()); - newEventLock.lock(); - try { - if (Boolean.FALSE.equals(sessionNewEvents.get(edgeId))) { - log.trace("[{}] set session new events flag to true [{}]", tenantId, edgeId.getId()); - sessionNewEvents.put(edgeId, true); + EdgeGrpcSession session = sessions.get(edgeId); + if (session != null && session.isConnected()) { + log.trace("[{}] onEdgeEvent [{}]", tenantId, edgeId.getId()); + final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock()); + newEventLock.lock(); + try { + if (Boolean.FALSE.equals(sessionNewEvents.get(edgeId))) { + log.trace("[{}] set session new events flag to true [{}]", tenantId, edgeId.getId()); + sessionNewEvents.put(edgeId, true); + } + } finally { + newEventLock.unlock(); } - } finally { - newEventLock.unlock(); } } @@ -221,13 +236,48 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } @Override - public void startSyncProcess(TenantId tenantId, EdgeId edgeId) { + public void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId) { EdgeGrpcSession session = sessions.get(edgeId); - if (session != null && session.isConnected()) { - session.startSyncProcess(tenantId, edgeId); + if (session != null) { + boolean success = false; + if (session.isConnected()) { + session.startSyncProcess(tenantId, edgeId); + success = true; + } + clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success)); + } + } + + @Override + public void processSyncRequest(ToEdgeSyncRequest request, Consumer responseConsumer) { + log.trace("[{}][{}] Processing sync edge request [{}]", request.getTenantId(), request.getId(), request.getEdgeId()); + UUID requestId = request.getId(); + localSyncEdgeRequests.put(requestId, responseConsumer); + clusterService.pushEdgeSyncRequestToCore(request); + scheduleSyncRequestTimeout(request, requestId); + } + + private void scheduleSyncRequestTimeout(ToEdgeSyncRequest request, UUID requestId) { + log.trace("[{}] scheduling sync edge request", requestId); + syncScheduler.schedule(() -> { + log.trace("[{}] checking if sync edge request is not processed...", requestId); + Consumer consumer = localSyncEdgeRequests.remove(requestId); + if (consumer != null) { + log.trace("[{}] timeout for processing sync edge request.", requestId); + consumer.accept(new FromEdgeSyncResponse(requestId, request.getTenantId(), request.getEdgeId(), false)); + } + }, 10, TimeUnit.SECONDS); + } + + @Override + public void processSyncResponse(FromEdgeSyncResponse response) { + log.trace("[{}] Received response from sync service: [{}]", response.getId(), response); + UUID requestId = response.getId(); + Consumer consumer = localSyncEdgeRequests.remove(requestId); + if (consumer != null) { + consumer.accept(response); } else { - log.error("[{}] Edge is not connected [{}]", tenantId, edgeId); - throw new RuntimeException("Edge is not connected"); + log.trace("[{}] Unknown or stale sync response received [{}]", requestId, response); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java index ad65c15235..9564e195de 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java @@ -18,6 +18,11 @@ package org.thingsboard.server.service.edge.rpc; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; +import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; + +import java.util.UUID; +import java.util.function.Consumer; public interface EdgeRpcService { @@ -27,5 +32,9 @@ public interface EdgeRpcService { void onEdgeEvent(TenantId tenantId, EdgeId edgeId); - void startSyncProcess(TenantId tenantId, EdgeId edgeId); + void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId); + + void processSyncRequest(ToEdgeSyncRequest request, Consumer responseConsumer); + + void processSyncResponse(FromEdgeSyncResponse response); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 062e4128d4..7c2ddf09f9 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -49,6 +49,8 @@ import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; +import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; +import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; @@ -351,12 +353,32 @@ public class DefaultTbClusterService implements TbClusterService { log.trace("[{}] Processing edge {} event update ", tenantId, edgeId); EdgeEventUpdateMsg msg = new EdgeEventUpdateMsg(tenantId, edgeId); byte[] msgBytes = encodingService.encode(msg); + ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdateMsg(ByteString.copyFrom(msgBytes)).build(); + pushEdgeSyncMsgToCore(edgeId, toCoreMsg); + } + + @Override + public void pushEdgeSyncRequestToCore(ToEdgeSyncRequest toEdgeSyncRequest) { + log.trace("[{}] Processing edge sync request {} ", toEdgeSyncRequest.getTenantId(), toEdgeSyncRequest); + byte[] msgBytes = encodingService.encode(toEdgeSyncRequest); + ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToEdgeSyncRequestMsg(ByteString.copyFrom(msgBytes)).build(); + pushEdgeSyncMsgToCore(toEdgeSyncRequest.getEdgeId(), toCoreMsg); + } + + @Override + public void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse) { + log.trace("[{}] Processing edge sync response {}", fromEdgeSyncResponse.getTenantId(), fromEdgeSyncResponse); + byte[] msgBytes = encodingService.encode(fromEdgeSyncResponse); + ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setFromEdgeSyncResponseMsg(ByteString.copyFrom(msgBytes)).build(); + pushEdgeSyncMsgToCore(fromEdgeSyncResponse.getEdgeId(), toCoreMsg); + } + + private void pushEdgeSyncMsgToCore(EdgeId edgeId, ToCoreNotificationMsg toCoreMsg) { TbQueueProducer> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer(); Set tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE); for (String serviceId : tbCoreServices) { TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, serviceId); - ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdateMsg(ByteString.copyFrom(msgBytes)).build(); - toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEdgeId().getId(), toCoreMsg), null); + toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(edgeId.getId(), toCoreMsg), null); toCoreNfs.incrementAndGet(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index a3bae8bc4e..745ffa7bab 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -20,8 +20,6 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.event.EventListener; -import org.springframework.core.annotation.Order; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; @@ -76,7 +74,6 @@ import org.thingsboard.server.service.state.DeviceStateService; import org.thingsboard.server.service.subscription.SubscriptionManagerService; import org.thingsboard.server.service.subscription.TbLocalSubscriptionService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; -import org.thingsboard.server.service.sync.vc.EntitiesVersionControlService; import org.thingsboard.server.service.sync.vc.GitVersionControlQueueService; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; @@ -317,13 +314,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg = encodingService.decode(toCoreNotification.getEdgeEventUpdateMsg().toByteArray()); - if (actorMsg.isPresent()) { - log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.tellWithHighPriority(actorMsg.get()); - } - callback.onSuccess(); + } else if (!toCoreNotification.getEdgeEventUpdateMsg().isEmpty()) { + forwardToAppActor(id, encodingService.decode(toCoreNotification.getEdgeEventUpdateMsg().toByteArray()), callback); + } else if (!toCoreNotification.getToEdgeSyncRequestMsg().isEmpty()) { + forwardToAppActor(id, encodingService.decode(toCoreNotification.getToEdgeSyncRequestMsg().toByteArray()), callback); + } else if (!toCoreNotification.getFromEdgeSyncResponseMsg().isEmpty()) { + forwardToAppActor(id, encodingService.decode(toCoreNotification.getFromEdgeSyncResponseMsg().toByteArray()), callback); } else if (toCoreNotification.hasQueueUpdateMsg()) { TransportProtos.QueueUpdateMsg queue = toCoreNotification.getQueueUpdateMsg(); partitionService.updateQueue(queue); @@ -552,6 +548,14 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg, TbCallback callback) { + if (actorMsg.isPresent()) { + log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); + actorContext.tellWithHighPriority(actorMsg.get()); + } + callback.onSuccess(); + } + private void throwNotHandled(Object msg, TbCallback callback) { log.warn("Message not handled: {}", msg); callback.onFailure(new RuntimeException("Message not handled!")); diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java index d258aa3820..b8f814d613 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java @@ -29,6 +29,8 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; +import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; +import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; @@ -88,5 +90,9 @@ public interface TbClusterService extends TbQueueClusterService { void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId); + void pushEdgeSyncRequestToCore(ToEdgeSyncRequest toEdgeSyncRequest); + + void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse); + void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action); } diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 1714e109a3..cb3443a71f 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -932,6 +932,8 @@ message ToCoreNotificationMsg { QueueUpdateMsg queueUpdateMsg = 5; QueueDeleteMsg queueDeleteMsg = 6; VersionControlResponseMsg vcResponseMsg = 7; + bytes toEdgeSyncRequestMsg = 8; + bytes fromEdgeSyncResponseMsg = 9; } /* Messages that are handled by ThingsBoard RuleEngine Service */ diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java index 269aa1e727..98c185b6b5 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java @@ -122,6 +122,12 @@ public enum MsgType { /** * Message that is sent on Edge Event to Edge Session */ - EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG; + EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG, + + /** + * Messages that are sent to and from edge session to start edge synchronization process + */ + EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG, + EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG; } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java new file mode 100644 index 0000000000..4e6d456d22 --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg.edge; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.aware.TenantAwareMsg; +import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg; + +import java.util.UUID; + +@AllArgsConstructor +@Getter +public class FromEdgeSyncResponse implements TenantAwareMsg, ToAllNodesMsg { + + private final UUID id; + private final TenantId tenantId; + private final EdgeId edgeId; + private final boolean success; + + @Override + public MsgType getMsgType() { + return MsgType.EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG; + } +} diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/ToEdgeSyncRequest.java b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/ToEdgeSyncRequest.java new file mode 100644 index 0000000000..6e1b0df3ad --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/ToEdgeSyncRequest.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg.edge; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.aware.TenantAwareMsg; +import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg; + +import java.util.UUID; + +@AllArgsConstructor +@Getter +public class ToEdgeSyncRequest implements TenantAwareMsg, ToAllNodesMsg { + private final UUID id; + private final TenantId tenantId; + private final EdgeId edgeId; + + @Override + public MsgType getMsgType() { + return MsgType.EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG; + } +}