Edge sync functionality - added cluster support
This commit is contained in:
parent
abbf791987
commit
faaf07b1ea
@ -36,7 +36,6 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
import org.thingsboard.server.common.msg.MsgType;
|
||||
import org.thingsboard.server.common.msg.TbActorMsg;
|
||||
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
|
||||
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
|
||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
|
||||
import org.thingsboard.server.common.msg.queue.RuleEngineException;
|
||||
@ -105,7 +104,9 @@ public class AppActor extends ContextAwareActor {
|
||||
onToDeviceActorMsg((TenantAwareMsg) msg, true);
|
||||
break;
|
||||
case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
|
||||
onToTenantActorMsg((EdgeEventUpdateMsg) msg);
|
||||
case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG:
|
||||
case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG:
|
||||
onToTenantActorMsg((TenantAwareMsg) msg);
|
||||
break;
|
||||
case SESSION_TIMEOUT_MSG:
|
||||
ctx.broadcastToChildrenByType(msg, EntityType.TENANT);
|
||||
@ -193,7 +194,7 @@ public class AppActor extends ContextAwareActor {
|
||||
() -> new TenantActor.ActorCreator(systemContext, tenantId));
|
||||
}
|
||||
|
||||
private void onToTenantActorMsg(EdgeEventUpdateMsg msg) {
|
||||
private void onToTenantActorMsg(TenantAwareMsg msg) {
|
||||
TbActorRef target = null;
|
||||
if (ModelConstants.SYSTEM_TENANT.equals(msg.getTenantId())) {
|
||||
log.warn("Message has system tenant id: {}", msg);
|
||||
|
||||
@ -48,6 +48,8 @@ import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
|
||||
import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
|
||||
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
|
||||
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
|
||||
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
|
||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
|
||||
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
|
||||
@ -167,7 +169,9 @@ public class TenantActor extends RuleChainManagerActor {
|
||||
onRuleChainMsg((RuleChainAwareMsg) msg);
|
||||
break;
|
||||
case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
|
||||
onToEdgeSessionMsg((EdgeEventUpdateMsg) msg);
|
||||
case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG:
|
||||
case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG:
|
||||
onToEdgeSessionMsg(msg);
|
||||
break;
|
||||
default:
|
||||
return false;
|
||||
@ -271,9 +275,24 @@ public class TenantActor extends RuleChainManagerActor {
|
||||
() -> new DeviceActorCreator(systemContext, tenantId, deviceId));
|
||||
}
|
||||
|
||||
private void onToEdgeSessionMsg(EdgeEventUpdateMsg msg) {
|
||||
log.trace("[{}] onToEdgeSessionMsg [{}]", msg.getTenantId(), msg);
|
||||
systemContext.getEdgeRpcService().onEdgeEvent(tenantId, msg.getEdgeId());
|
||||
private void onToEdgeSessionMsg(TbActorMsg msg) {
|
||||
switch (msg.getMsgType()) {
|
||||
case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
|
||||
EdgeEventUpdateMsg edgeEventUpdateMsg = (EdgeEventUpdateMsg) msg;
|
||||
log.trace("[{}] onToEdgeSessionMsg [{}]", edgeEventUpdateMsg.getTenantId(), msg);
|
||||
systemContext.getEdgeRpcService().onEdgeEvent(tenantId, edgeEventUpdateMsg.getEdgeId());
|
||||
break;
|
||||
case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG:
|
||||
ToEdgeSyncRequest toEdgeSyncRequest = (ToEdgeSyncRequest) msg;
|
||||
log.trace("[{}] toEdgeSyncRequest [{}]", toEdgeSyncRequest.getTenantId(), msg);
|
||||
systemContext.getEdgeRpcService().startSyncProcess(tenantId, toEdgeSyncRequest.getEdgeId(), toEdgeSyncRequest.getId());
|
||||
break;
|
||||
case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG:
|
||||
FromEdgeSyncResponse fromEdgeSyncResponse = (FromEdgeSyncResponse) msg;
|
||||
log.trace("[{}] fromEdgeSyncResponse [{}]", fromEdgeSyncResponse.getTenantId(), msg);
|
||||
systemContext.getEdgeRpcService().processSyncResponse(fromEdgeSyncResponse);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private ApiUsageState getApiUsageState() {
|
||||
|
||||
@ -269,10 +269,7 @@ public abstract class BaseController {
|
||||
protected EdgeService edgeService;
|
||||
|
||||
@Autowired(required = false)
|
||||
protected EdgeNotificationService edgeNotificationService;
|
||||
|
||||
@Autowired(required = false)
|
||||
protected EdgeRpcService edgeGrpcService;
|
||||
protected EdgeRpcService edgeRpcService;
|
||||
|
||||
@Autowired
|
||||
protected TbNotificationEntityService notificationEntityService;
|
||||
|
||||
@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.security.access.prepost.PreAuthorize;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
@ -32,6 +33,7 @@ import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.ResponseBody;
|
||||
import org.springframework.web.bind.annotation.ResponseStatus;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.context.request.async.DeferredResult;
|
||||
import org.thingsboard.rule.engine.flow.TbRuleChainInputNode;
|
||||
import org.thingsboard.server.common.data.Customer;
|
||||
import org.thingsboard.server.common.data.EntitySubtype;
|
||||
@ -47,6 +49,8 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.rule.RuleChain;
|
||||
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
|
||||
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
|
||||
import org.thingsboard.server.dao.exception.DataValidationException;
|
||||
import org.thingsboard.server.dao.exception.IncorrectParameterException;
|
||||
import org.thingsboard.server.dao.model.ModelConstants;
|
||||
@ -61,6 +65,7 @@ import org.thingsboard.server.service.security.permission.Resource;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.thingsboard.server.controller.ControllerConstants.CUSTOMER_ID_PARAM_DESCRIPTION;
|
||||
@ -529,24 +534,35 @@ public class EdgeController extends BaseController {
|
||||
"All entities that are assigned to particular edge are going to be send to remote edge service." + TENANT_AUTHORITY_PARAGRAPH)
|
||||
@PreAuthorize("hasAuthority('TENANT_ADMIN')")
|
||||
@RequestMapping(value = "/edge/sync/{edgeId}", method = RequestMethod.POST)
|
||||
public void syncEdge(@ApiParam(value = EDGE_ID_PARAM_DESCRIPTION, required = true)
|
||||
public DeferredResult<ResponseEntity> syncEdge(@ApiParam(value = EDGE_ID_PARAM_DESCRIPTION, required = true)
|
||||
@PathVariable("edgeId") String strEdgeId) throws ThingsboardException {
|
||||
checkParameter("edgeId", strEdgeId);
|
||||
try {
|
||||
final DeferredResult<ResponseEntity> response = new DeferredResult<>();
|
||||
if (isEdgesEnabled()) {
|
||||
EdgeId edgeId = new EdgeId(toUUID(strEdgeId));
|
||||
edgeId = checkNotNull(edgeId);
|
||||
SecurityUser user = getCurrentUser();
|
||||
TenantId tenantId = user.getTenantId();
|
||||
edgeGrpcService.startSyncProcess(tenantId, edgeId);
|
||||
ToEdgeSyncRequest request = new ToEdgeSyncRequest(UUID.randomUUID(), tenantId, edgeId);
|
||||
edgeRpcService.processSyncRequest(request, fromEdgeSyncResponse -> reply(response, fromEdgeSyncResponse));
|
||||
} else {
|
||||
throw new ThingsboardException("Edges support disabled", ThingsboardErrorCode.GENERAL);
|
||||
}
|
||||
return response;
|
||||
} catch (Exception e) {
|
||||
throw handleException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void reply(DeferredResult<ResponseEntity> response, FromEdgeSyncResponse fromEdgeSyncResponse) {
|
||||
if (fromEdgeSyncResponse.isSuccess()) {
|
||||
response.setResult(new ResponseEntity<>(HttpStatus.OK));
|
||||
} else {
|
||||
response.setErrorResult(new ThingsboardException("Edge is not connected", ThingsboardErrorCode.GENERAL));
|
||||
}
|
||||
}
|
||||
|
||||
@ApiOperation(value = "Find missing rule chains (findMissingToRelatedRuleChains)",
|
||||
notes = "Returns list of rule chains ids that are not assigned to particular edge, but these rule chains are present in the already assigned rule chains to edge." + TENANT_AUTHORITY_PARAGRAPH)
|
||||
@PreAuthorize("hasAuthority('TENANT_ADMIN')")
|
||||
|
||||
@ -26,6 +26,7 @@ import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.cluster.TbClusterService;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.ResourceUtils;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
@ -34,6 +35,8 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
|
||||
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
|
||||
import org.thingsboard.server.gen.edge.v1.EdgeRpcServiceGrpc;
|
||||
import org.thingsboard.server.gen.edge.v1.RequestMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.ResponseMsg;
|
||||
@ -59,6 +62,7 @@ import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@ -71,6 +75,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
||||
private final Map<EdgeId, Boolean> sessionNewEvents = new HashMap<>();
|
||||
private final ConcurrentMap<EdgeId, ScheduledFuture<?>> sessionEdgeEventChecks = new ConcurrentHashMap<>();
|
||||
|
||||
private final ConcurrentMap<UUID, Consumer<FromEdgeSyncResponse>> localSyncEdgeRequests = new ConcurrentHashMap<>();
|
||||
|
||||
@Value("${edges.rpc.port}")
|
||||
private int rpcPort;
|
||||
@Value("${edges.rpc.ssl.enabled}")
|
||||
@ -98,12 +104,17 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
||||
@Autowired
|
||||
private TelemetrySubscriptionService tsSubService;
|
||||
|
||||
@Autowired
|
||||
private TbClusterService clusterService;
|
||||
|
||||
private Server server;
|
||||
|
||||
private ScheduledExecutorService edgeEventProcessingExecutorService;
|
||||
|
||||
private ScheduledExecutorService sendDownlinkExecutorService;
|
||||
|
||||
private ScheduledExecutorService syncScheduler;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
log.info("Initializing Edge RPC service!");
|
||||
@ -131,6 +142,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
||||
}
|
||||
this.edgeEventProcessingExecutorService = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-scheduler"));
|
||||
this.sendDownlinkExecutorService = Executors.newScheduledThreadPool(sendSchedulerPoolSize, ThingsBoardThreadFactory.forName("edge-send-scheduler"));
|
||||
this.syncScheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("edge-sync-scheduler"));
|
||||
log.info("Edge RPC service initialized!");
|
||||
}
|
||||
|
||||
@ -191,16 +203,19 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
||||
|
||||
@Override
|
||||
public void onEdgeEvent(TenantId tenantId, EdgeId edgeId) {
|
||||
log.trace("[{}] onEdgeEvent [{}]", tenantId, edgeId.getId());
|
||||
final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
|
||||
newEventLock.lock();
|
||||
try {
|
||||
if (Boolean.FALSE.equals(sessionNewEvents.get(edgeId))) {
|
||||
log.trace("[{}] set session new events flag to true [{}]", tenantId, edgeId.getId());
|
||||
sessionNewEvents.put(edgeId, true);
|
||||
EdgeGrpcSession session = sessions.get(edgeId);
|
||||
if (session != null && session.isConnected()) {
|
||||
log.trace("[{}] onEdgeEvent [{}]", tenantId, edgeId.getId());
|
||||
final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
|
||||
newEventLock.lock();
|
||||
try {
|
||||
if (Boolean.FALSE.equals(sessionNewEvents.get(edgeId))) {
|
||||
log.trace("[{}] set session new events flag to true [{}]", tenantId, edgeId.getId());
|
||||
sessionNewEvents.put(edgeId, true);
|
||||
}
|
||||
} finally {
|
||||
newEventLock.unlock();
|
||||
}
|
||||
} finally {
|
||||
newEventLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@ -221,13 +236,48 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startSyncProcess(TenantId tenantId, EdgeId edgeId) {
|
||||
public void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId) {
|
||||
EdgeGrpcSession session = sessions.get(edgeId);
|
||||
if (session != null && session.isConnected()) {
|
||||
session.startSyncProcess(tenantId, edgeId);
|
||||
if (session != null) {
|
||||
boolean success = false;
|
||||
if (session.isConnected()) {
|
||||
session.startSyncProcess(tenantId, edgeId);
|
||||
success = true;
|
||||
}
|
||||
clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processSyncRequest(ToEdgeSyncRequest request, Consumer<FromEdgeSyncResponse> responseConsumer) {
|
||||
log.trace("[{}][{}] Processing sync edge request [{}]", request.getTenantId(), request.getId(), request.getEdgeId());
|
||||
UUID requestId = request.getId();
|
||||
localSyncEdgeRequests.put(requestId, responseConsumer);
|
||||
clusterService.pushEdgeSyncRequestToCore(request);
|
||||
scheduleSyncRequestTimeout(request, requestId);
|
||||
}
|
||||
|
||||
private void scheduleSyncRequestTimeout(ToEdgeSyncRequest request, UUID requestId) {
|
||||
log.trace("[{}] scheduling sync edge request", requestId);
|
||||
syncScheduler.schedule(() -> {
|
||||
log.trace("[{}] checking if sync edge request is not processed...", requestId);
|
||||
Consumer<FromEdgeSyncResponse> consumer = localSyncEdgeRequests.remove(requestId);
|
||||
if (consumer != null) {
|
||||
log.trace("[{}] timeout for processing sync edge request.", requestId);
|
||||
consumer.accept(new FromEdgeSyncResponse(requestId, request.getTenantId(), request.getEdgeId(), false));
|
||||
}
|
||||
}, 10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processSyncResponse(FromEdgeSyncResponse response) {
|
||||
log.trace("[{}] Received response from sync service: [{}]", response.getId(), response);
|
||||
UUID requestId = response.getId();
|
||||
Consumer<FromEdgeSyncResponse> consumer = localSyncEdgeRequests.remove(requestId);
|
||||
if (consumer != null) {
|
||||
consumer.accept(response);
|
||||
} else {
|
||||
log.error("[{}] Edge is not connected [{}]", tenantId, edgeId);
|
||||
throw new RuntimeException("Edge is not connected");
|
||||
log.trace("[{}] Unknown or stale sync response received [{}]", requestId, response);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -18,6 +18,11 @@ package org.thingsboard.server.service.edge.rpc;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
|
||||
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
|
||||
|
||||
import java.util.UUID;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public interface EdgeRpcService {
|
||||
|
||||
@ -27,5 +32,9 @@ public interface EdgeRpcService {
|
||||
|
||||
void onEdgeEvent(TenantId tenantId, EdgeId edgeId);
|
||||
|
||||
void startSyncProcess(TenantId tenantId, EdgeId edgeId);
|
||||
void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId);
|
||||
|
||||
void processSyncRequest(ToEdgeSyncRequest request, Consumer<FromEdgeSyncResponse> responseConsumer);
|
||||
|
||||
void processSyncResponse(FromEdgeSyncResponse response);
|
||||
}
|
||||
|
||||
@ -49,6 +49,8 @@ import org.thingsboard.server.common.data.queue.Queue;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg;
|
||||
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
|
||||
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
|
||||
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
|
||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
@ -351,12 +353,32 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
log.trace("[{}] Processing edge {} event update ", tenantId, edgeId);
|
||||
EdgeEventUpdateMsg msg = new EdgeEventUpdateMsg(tenantId, edgeId);
|
||||
byte[] msgBytes = encodingService.encode(msg);
|
||||
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdateMsg(ByteString.copyFrom(msgBytes)).build();
|
||||
pushEdgeSyncMsgToCore(edgeId, toCoreMsg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushEdgeSyncRequestToCore(ToEdgeSyncRequest toEdgeSyncRequest) {
|
||||
log.trace("[{}] Processing edge sync request {} ", toEdgeSyncRequest.getTenantId(), toEdgeSyncRequest);
|
||||
byte[] msgBytes = encodingService.encode(toEdgeSyncRequest);
|
||||
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToEdgeSyncRequestMsg(ByteString.copyFrom(msgBytes)).build();
|
||||
pushEdgeSyncMsgToCore(toEdgeSyncRequest.getEdgeId(), toCoreMsg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse) {
|
||||
log.trace("[{}] Processing edge sync response {}", fromEdgeSyncResponse.getTenantId(), fromEdgeSyncResponse);
|
||||
byte[] msgBytes = encodingService.encode(fromEdgeSyncResponse);
|
||||
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setFromEdgeSyncResponseMsg(ByteString.copyFrom(msgBytes)).build();
|
||||
pushEdgeSyncMsgToCore(fromEdgeSyncResponse.getEdgeId(), toCoreMsg);
|
||||
}
|
||||
|
||||
private void pushEdgeSyncMsgToCore(EdgeId edgeId, ToCoreNotificationMsg toCoreMsg) {
|
||||
TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer();
|
||||
Set<String> tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE);
|
||||
for (String serviceId : tbCoreServices) {
|
||||
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, serviceId);
|
||||
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdateMsg(ByteString.copyFrom(msgBytes)).build();
|
||||
toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEdgeId().getId(), toCoreMsg), null);
|
||||
toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(edgeId.getId(), toCoreMsg), null);
|
||||
toCoreNfs.incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,8 +20,6 @@ import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
@ -76,7 +74,6 @@ import org.thingsboard.server.service.state.DeviceStateService;
|
||||
import org.thingsboard.server.service.subscription.SubscriptionManagerService;
|
||||
import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
|
||||
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
|
||||
import org.thingsboard.server.service.sync.vc.EntitiesVersionControlService;
|
||||
import org.thingsboard.server.service.sync.vc.GitVersionControlQueueService;
|
||||
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
|
||||
|
||||
@ -317,13 +314,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
} else if (toCoreNotification.getComponentLifecycleMsg() != null && !toCoreNotification.getComponentLifecycleMsg().isEmpty()) {
|
||||
handleComponentLifecycleMsg(id, toCoreNotification.getComponentLifecycleMsg());
|
||||
callback.onSuccess();
|
||||
} else if (toCoreNotification.getEdgeEventUpdateMsg() != null && !toCoreNotification.getEdgeEventUpdateMsg().isEmpty()) {
|
||||
Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreNotification.getEdgeEventUpdateMsg().toByteArray());
|
||||
if (actorMsg.isPresent()) {
|
||||
log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
|
||||
actorContext.tellWithHighPriority(actorMsg.get());
|
||||
}
|
||||
callback.onSuccess();
|
||||
} else if (!toCoreNotification.getEdgeEventUpdateMsg().isEmpty()) {
|
||||
forwardToAppActor(id, encodingService.decode(toCoreNotification.getEdgeEventUpdateMsg().toByteArray()), callback);
|
||||
} else if (!toCoreNotification.getToEdgeSyncRequestMsg().isEmpty()) {
|
||||
forwardToAppActor(id, encodingService.decode(toCoreNotification.getToEdgeSyncRequestMsg().toByteArray()), callback);
|
||||
} else if (!toCoreNotification.getFromEdgeSyncResponseMsg().isEmpty()) {
|
||||
forwardToAppActor(id, encodingService.decode(toCoreNotification.getFromEdgeSyncResponseMsg().toByteArray()), callback);
|
||||
} else if (toCoreNotification.hasQueueUpdateMsg()) {
|
||||
TransportProtos.QueueUpdateMsg queue = toCoreNotification.getQueueUpdateMsg();
|
||||
partitionService.updateQueue(queue);
|
||||
@ -552,6 +548,14 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
actorContext.tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback));
|
||||
}
|
||||
|
||||
private void forwardToAppActor(UUID id, Optional<TbActorMsg> actorMsg, TbCallback callback) {
|
||||
if (actorMsg.isPresent()) {
|
||||
log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
|
||||
actorContext.tellWithHighPriority(actorMsg.get());
|
||||
}
|
||||
callback.onSuccess();
|
||||
}
|
||||
|
||||
private void throwNotHandled(Object msg, TbCallback callback) {
|
||||
log.warn("Message not handled: {}", msg);
|
||||
callback.onFailure(new RuntimeException("Message not handled!"));
|
||||
|
||||
@ -29,6 +29,8 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg;
|
||||
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
|
||||
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
|
||||
@ -88,5 +90,9 @@ public interface TbClusterService extends TbQueueClusterService {
|
||||
|
||||
void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId);
|
||||
|
||||
void pushEdgeSyncRequestToCore(ToEdgeSyncRequest toEdgeSyncRequest);
|
||||
|
||||
void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse);
|
||||
|
||||
void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action);
|
||||
}
|
||||
|
||||
@ -932,6 +932,8 @@ message ToCoreNotificationMsg {
|
||||
QueueUpdateMsg queueUpdateMsg = 5;
|
||||
QueueDeleteMsg queueDeleteMsg = 6;
|
||||
VersionControlResponseMsg vcResponseMsg = 7;
|
||||
bytes toEdgeSyncRequestMsg = 8;
|
||||
bytes fromEdgeSyncResponseMsg = 9;
|
||||
}
|
||||
|
||||
/* Messages that are handled by ThingsBoard RuleEngine Service */
|
||||
|
||||
@ -122,6 +122,12 @@ public enum MsgType {
|
||||
/**
|
||||
* Message that is sent on Edge Event to Edge Session
|
||||
*/
|
||||
EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG;
|
||||
EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG,
|
||||
|
||||
/**
|
||||
* Messages that are sent to and from edge session to start edge synchronization process
|
||||
*/
|
||||
EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG,
|
||||
EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG;
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,41 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.msg.edge;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.MsgType;
|
||||
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
|
||||
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@AllArgsConstructor
|
||||
@Getter
|
||||
public class FromEdgeSyncResponse implements TenantAwareMsg, ToAllNodesMsg {
|
||||
|
||||
private final UUID id;
|
||||
private final TenantId tenantId;
|
||||
private final EdgeId edgeId;
|
||||
private final boolean success;
|
||||
|
||||
@Override
|
||||
public MsgType getMsgType() {
|
||||
return MsgType.EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,39 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.msg.edge;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.MsgType;
|
||||
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
|
||||
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@AllArgsConstructor
|
||||
@Getter
|
||||
public class ToEdgeSyncRequest implements TenantAwareMsg, ToAllNodesMsg {
|
||||
private final UUID id;
|
||||
private final TenantId tenantId;
|
||||
private final EdgeId edgeId;
|
||||
|
||||
@Override
|
||||
public MsgType getMsgType() {
|
||||
return MsgType.EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG;
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user