diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 09dcf3ca1e..2dc73c51c7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -29,7 +29,6 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.SmsService; @@ -101,6 +100,7 @@ import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.executors.ExternalCallExecutorService; import org.thingsboard.server.service.executors.NotificationExecutorService; +import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.server.service.executors.SharedEventLoopGroupService; import org.thingsboard.server.service.mail.MailExecutorService; import org.thingsboard.server.service.profile.TbAssetProfileCache; diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 50e237ac73..aee8a33ab7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -25,10 +25,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.LinkedHashMapRemoveEldest; -import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; @@ -61,7 +57,14 @@ import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg; +import org.thingsboard.server.common.msg.rpc.RemoveRpcActorMsg; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; @@ -87,10 +90,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; -import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg; -import org.thingsboard.server.common.msg.rpc.RemoveRpcActorMsg; import org.thingsboard.server.service.rpc.RpcSubmitStrategy; -import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; import javax.annotation.Nullable; @@ -173,7 +173,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private EdgeId findRelatedEdgeId() { List result = - systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.EDGE_TYPE, RelationTypeGroup.COMMON); + systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON); if (result != null && result.size() > 0) { EntityRelation relationToEdge = result.get(0); if (relationToEdge.getFrom() != null && relationToEdge.getFrom().getId() != null) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 14930912d1..cfa87c0508 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -44,6 +44,8 @@ import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.TbMsg; @@ -57,6 +59,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; +import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -68,8 +71,8 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.MultipleTbQueueCallbackWrapper; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; @@ -77,6 +80,7 @@ import org.thingsboard.server.service.ota.OtaPackageStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -116,6 +120,7 @@ public class DefaultTbClusterService implements TbClusterService { private final TbDeviceProfileCache deviceProfileCache; private final TbAssetProfileCache assetProfileCache; private final GatewayNotificationsService gatewayNotificationsService; + private final EdgeService edgeService; @Override public void pushMsgToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, TbQueueCallback callback) { @@ -546,11 +551,17 @@ public class DefaultTbClusterService implements TbClusterService { pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), edgeId), null); break; case UNASSIGNED_FROM_EDGE: - pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), null), null); + EdgeId relatedEdgeId = findRelatedEdgeEdgeIdIfAny(tenantId, entityId); + pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), relatedEdgeId), null); break; } } + private EdgeId findRelatedEdgeEdgeIdIfAny(TenantId tenantId, EntityId entityId) { + PageData pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, new PageLink(1)); + return Optional.ofNullable(pageData).filter(pd -> pd.getTotalElements() == 1).map(pd -> pd.getData().get(0)).orElse(null); + } + @Override public void onQueueChange(Queue queue) { log.trace("[{}][{}] Processing queue change [{}] event", queue.getTenantId(), queue.getId(), queue.getName());