Do not push edge notifications in case edges disabled. Check if edges enabled before processing msg by edge consumer

This commit is contained in:
Volodymyr Babak 2025-04-09 12:07:50 +03:00
parent a38d39fda9
commit 79381092b2
2 changed files with 23 additions and 12 deletions

View File

@ -551,11 +551,15 @@ public class DefaultTbClusterService implements TbClusterService {
} }
private void processEdgeNotification(EdgeId edgeId, ToEdgeNotificationMsg toEdgeNotificationMsg) { private void processEdgeNotification(EdgeId edgeId, ToEdgeNotificationMsg toEdgeNotificationMsg) {
var serviceIdOpt = Optional.ofNullable(edgeIdServiceIdCache.get(edgeId)); if (edgesEnabled) {
serviceIdOpt.ifPresentOrElse( var serviceIdOpt = Optional.ofNullable(edgeIdServiceIdCache.get(edgeId));
serviceId -> pushMsgToEdgeNotification(toEdgeNotificationMsg, serviceId.get()), serviceIdOpt.ifPresentOrElse(
() -> broadcastEdgeNotification(edgeId, toEdgeNotificationMsg) serviceId -> pushMsgToEdgeNotification(toEdgeNotificationMsg, serviceId.get()),
); () -> broadcastEdgeNotification(edgeId, toEdgeNotificationMsg)
);
} else {
log.trace("Edges disabled. Ignoring edge notification {} for edgeId: {}", toEdgeNotificationMsg, edgeId);
}
} }
private void pushMsgToEdgeNotification(ToEdgeNotificationMsg toEdgeNotificationMsg, String serviceId) { private void pushMsgToEdgeNotification(ToEdgeNotificationMsg toEdgeNotificationMsg, String serviceId) {

View File

@ -51,6 +51,7 @@ import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.EdgeContextComponent; import org.thingsboard.server.service.edge.EdgeContextComponent;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager; import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair; import org.thingsboard.server.service.queue.processing.IdMsgPair;
@ -195,36 +196,42 @@ public class DefaultTbEdgeConsumerService extends AbstractConsumerService<ToEdge
protected void handleNotification(UUID id, TbProtoQueueMsg<ToEdgeNotificationMsg> msg, TbCallback callback) { protected void handleNotification(UUID id, TbProtoQueueMsg<ToEdgeNotificationMsg> msg, TbCallback callback) {
ToEdgeNotificationMsg toEdgeNotificationMsg = msg.getValue(); ToEdgeNotificationMsg toEdgeNotificationMsg = msg.getValue();
try { try {
EdgeRpcService edgeRpcService = edgeCtx.getEdgeRpcService();
if (edgeRpcService == null) {
log.debug("No EdgeRpcService available (edge functionality disabled), ignoring msg: {}", toEdgeNotificationMsg);
callback.onSuccess();
return;
}
if (toEdgeNotificationMsg.hasEdgeHighPriority()) { if (toEdgeNotificationMsg.hasEdgeHighPriority()) {
EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getEdgeHighPriority()); EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getEdgeHighPriority());
edgeCtx.getEdgeRpcService().onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg);
callback.onSuccess(); callback.onSuccess();
} else if (toEdgeNotificationMsg.hasEdgeEventUpdate()) { } else if (toEdgeNotificationMsg.hasEdgeEventUpdate()) {
EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getEdgeEventUpdate()); EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getEdgeEventUpdate());
edgeCtx.getEdgeRpcService().onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg);
callback.onSuccess(); callback.onSuccess();
} else if (toEdgeNotificationMsg.hasToEdgeSyncRequest()) { } else if (toEdgeNotificationMsg.hasToEdgeSyncRequest()) {
EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getToEdgeSyncRequest()); EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getToEdgeSyncRequest());
edgeCtx.getEdgeRpcService().onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg);
callback.onSuccess(); callback.onSuccess();
} else if (toEdgeNotificationMsg.hasFromEdgeSyncResponse()) { } else if (toEdgeNotificationMsg.hasFromEdgeSyncResponse()) {
EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getFromEdgeSyncResponse()); EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getFromEdgeSyncResponse());
edgeCtx.getEdgeRpcService().onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg);
callback.onSuccess(); callback.onSuccess();
} else if (toEdgeNotificationMsg.hasComponentLifecycle()) { } else if (toEdgeNotificationMsg.hasComponentLifecycle()) {
ComponentLifecycleMsg componentLifecycle = ProtoUtils.fromProto(toEdgeNotificationMsg.getComponentLifecycle()); ComponentLifecycleMsg componentLifecycle = ProtoUtils.fromProto(toEdgeNotificationMsg.getComponentLifecycle());
TenantId tenantId = componentLifecycle.getTenantId(); TenantId tenantId = componentLifecycle.getTenantId();
EdgeId edgeId = new EdgeId(componentLifecycle.getEntityId().getId()); EdgeId edgeId = new EdgeId(componentLifecycle.getEntityId().getId());
if (ComponentLifecycleEvent.DELETED.equals(componentLifecycle.getEvent())) { if (ComponentLifecycleEvent.DELETED.equals(componentLifecycle.getEvent())) {
edgeCtx.getEdgeRpcService().deleteEdge(tenantId, edgeId); edgeRpcService.deleteEdge(tenantId, edgeId);
} else if (ComponentLifecycleEvent.UPDATED.equals(componentLifecycle.getEvent())) { } else if (ComponentLifecycleEvent.UPDATED.equals(componentLifecycle.getEvent())) {
Edge edge = edgeCtx.getEdgeService().findEdgeById(tenantId, edgeId); Edge edge = edgeCtx.getEdgeService().findEdgeById(tenantId, edgeId);
edgeCtx.getEdgeRpcService().updateEdge(tenantId, edge); edgeRpcService.updateEdge(tenantId, edge);
} }
callback.onSuccess(); callback.onSuccess();
} }
} catch (Exception e) { } catch (Exception e) {
log.error("Error processing edge notification message", e); log.error("Error processing edge notification message {}", toEdgeNotificationMsg, e);
callback.onFailure(e); callback.onFailure(e);
} }