Merge pull request #9946 from AndriiLandiak/fix/edge-improve-push-device-message
Edge - improve Device Actor to detect edge relations on 'UNASSIGNED_FROM_EDGE'
This commit is contained in:
		
						commit
						4449ac4bb0
					
				@ -29,7 +29,6 @@ import org.springframework.data.redis.core.RedisTemplate;
 | 
			
		||||
import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider;
 | 
			
		||||
import org.thingsboard.rule.engine.api.MailService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.NotificationCenter;
 | 
			
		||||
import org.thingsboard.rule.engine.api.SmsService;
 | 
			
		||||
@ -101,6 +100,7 @@ import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
 | 
			
		||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
 | 
			
		||||
import org.thingsboard.server.service.executors.ExternalCallExecutorService;
 | 
			
		||||
import org.thingsboard.server.service.executors.NotificationExecutorService;
 | 
			
		||||
import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider;
 | 
			
		||||
import org.thingsboard.server.service.executors.SharedEventLoopGroupService;
 | 
			
		||||
import org.thingsboard.server.service.mail.MailExecutorService;
 | 
			
		||||
import org.thingsboard.server.service.profile.TbAssetProfileCache;
 | 
			
		||||
 | 
			
		||||
@ -25,10 +25,6 @@ import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.commons.collections.CollectionUtils;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.common.util.LinkedHashMapRemoveEldest;
 | 
			
		||||
import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.actors.ActorSystemContext;
 | 
			
		||||
import org.thingsboard.server.actors.TbActorCtx;
 | 
			
		||||
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
 | 
			
		||||
@ -61,7 +57,14 @@ import org.thingsboard.server.common.msg.TbActorMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TbCallback;
 | 
			
		||||
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
 | 
			
		||||
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.rpc.RemoveRpcActorMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 | 
			
		||||
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
 | 
			
		||||
@ -87,10 +90,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
 | 
			
		||||
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.rpc.RemoveRpcActorMsg;
 | 
			
		||||
import org.thingsboard.server.service.rpc.RpcSubmitStrategy;
 | 
			
		||||
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg;
 | 
			
		||||
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.Nullable;
 | 
			
		||||
@ -173,7 +173,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 | 
			
		||||
 | 
			
		||||
    private EdgeId findRelatedEdgeId() {
 | 
			
		||||
        List<EntityRelation> result =
 | 
			
		||||
                systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.EDGE_TYPE, RelationTypeGroup.COMMON);
 | 
			
		||||
                systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON);
 | 
			
		||||
        if (result != null && result.size() > 0) {
 | 
			
		||||
            EntityRelation relationToEdge = result.get(0);
 | 
			
		||||
            if (relationToEdge.getFrom() != null && relationToEdge.getFrom().getId() != null) {
 | 
			
		||||
 | 
			
		||||
@ -44,6 +44,8 @@ import org.thingsboard.server.common.data.id.EdgeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleChainId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
 | 
			
		||||
import org.thingsboard.server.common.data.queue.Queue;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
@ -57,6 +59,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
 | 
			
		||||
import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.dao.edge.EdgeService;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
 | 
			
		||||
@ -68,8 +71,8 @@ import org.thingsboard.server.queue.TbQueueCallback;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.common.MultipleTbQueueCallbackWrapper;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TopicService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TopicService;
 | 
			
		||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
 | 
			
		||||
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
 | 
			
		||||
import org.thingsboard.server.service.gateway_device.GatewayNotificationsService;
 | 
			
		||||
@ -77,6 +80,7 @@ import org.thingsboard.server.service.ota.OtaPackageStateService;
 | 
			
		||||
import org.thingsboard.server.service.profile.TbAssetProfileCache;
 | 
			
		||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
 | 
			
		||||
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
@ -116,6 +120,7 @@ public class DefaultTbClusterService implements TbClusterService {
 | 
			
		||||
    private final TbDeviceProfileCache deviceProfileCache;
 | 
			
		||||
    private final TbAssetProfileCache assetProfileCache;
 | 
			
		||||
    private final GatewayNotificationsService gatewayNotificationsService;
 | 
			
		||||
    private final EdgeService edgeService;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void pushMsgToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, TbQueueCallback callback) {
 | 
			
		||||
@ -546,11 +551,17 @@ public class DefaultTbClusterService implements TbClusterService {
 | 
			
		||||
                pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), edgeId), null);
 | 
			
		||||
                break;
 | 
			
		||||
            case UNASSIGNED_FROM_EDGE:
 | 
			
		||||
                pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), null), null);
 | 
			
		||||
                EdgeId relatedEdgeId = findRelatedEdgeIdIfAny(tenantId, entityId);
 | 
			
		||||
                pushMsgToCore(new DeviceEdgeUpdateMsg(tenantId, new DeviceId(entityId.getId()), relatedEdgeId), null);
 | 
			
		||||
                break;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private EdgeId findRelatedEdgeIdIfAny(TenantId tenantId, EntityId entityId) {
 | 
			
		||||
        PageData<EdgeId> pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, new PageLink(1));
 | 
			
		||||
        return Optional.ofNullable(pageData).filter(pd -> pd.getTotalElements() > 0).map(pd -> pd.getData().get(0)).orElse(null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onQueueChange(Queue queue) {
 | 
			
		||||
        log.trace("[{}][{}] Processing queue change [{}] event", queue.getTenantId(), queue.getId(), queue.getName());
 | 
			
		||||
 | 
			
		||||
@ -25,8 +25,10 @@ import io.netty.handler.codec.mqtt.MqttQoS;
 | 
			
		||||
import org.awaitility.Awaitility;
 | 
			
		||||
import org.junit.Assert;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.test.context.TestPropertySource;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.server.common.adaptor.JsonConverter;
 | 
			
		||||
import org.thingsboard.server.common.data.Customer;
 | 
			
		||||
import org.thingsboard.server.common.data.DataConstants;
 | 
			
		||||
import org.thingsboard.server.common.data.Device;
 | 
			
		||||
@ -44,15 +46,17 @@ import org.thingsboard.server.common.data.edge.EdgeEventActionType;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CustomerId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EdgeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.ota.OtaPackageType;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.common.data.page.TimePageLink;
 | 
			
		||||
import org.thingsboard.server.common.data.security.DeviceCredentials;
 | 
			
		||||
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
 | 
			
		||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.FeatureType;
 | 
			
		||||
import org.thingsboard.server.common.adaptor.JsonConverter;
 | 
			
		||||
import org.thingsboard.server.dao.edge.EdgeService;
 | 
			
		||||
import org.thingsboard.server.dao.service.DaoSqlTest;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.DeviceCredentialsRequestMsg;
 | 
			
		||||
@ -75,6 +79,7 @@ import java.util.Optional;
 | 
			
		||||
import java.util.Random;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicReference;
 | 
			
		||||
 | 
			
		||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 | 
			
		||||
 | 
			
		||||
@ -86,6 +91,9 @@ public class DeviceEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
 | 
			
		||||
    private static final String DEFAULT_DEVICE_TYPE = "default";
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    protected EdgeService edgeService;
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testDevices() throws Exception {
 | 
			
		||||
        // create device and assign to edge; update device
 | 
			
		||||
@ -769,6 +777,54 @@ public class DeviceEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testVerifyProcessCorrectEdgeUpdateToDeviceActorOnUnassignFromDifferentEdge() throws Exception {
 | 
			
		||||
        Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge();
 | 
			
		||||
 | 
			
		||||
        // assign device to another edge
 | 
			
		||||
        Edge tmpEdge = doPost("/api/edge", constructEdge("Test Tmp Edge", "test"), Edge.class);
 | 
			
		||||
        doPost("/api/edge/" + tmpEdge.getUuidId()
 | 
			
		||||
                + "/device/" + device.getUuidId(), Device.class);
 | 
			
		||||
        List<EdgeId> relatedEdgeIds = edgeService.findAllRelatedEdgeIds(tenantId, device.getId());
 | 
			
		||||
        Assert.assertEquals(2, relatedEdgeIds.size());
 | 
			
		||||
 | 
			
		||||
        // unassign device from edge
 | 
			
		||||
        doDelete("/api/edge/" + edge.getUuidId()
 | 
			
		||||
                + "/device/" + device.getUuidId(), Device.class);
 | 
			
		||||
        relatedEdgeIds = edgeService.findAllRelatedEdgeIds(tenantId, device.getId());
 | 
			
		||||
        Assert.assertEquals(1, relatedEdgeIds.size());
 | 
			
		||||
        Assert.assertEquals(tmpEdge.getId(), relatedEdgeIds.get(0));
 | 
			
		||||
 | 
			
		||||
        // clean up stored edge events
 | 
			
		||||
        edgeEventService.cleanupEvents(1);
 | 
			
		||||
 | 
			
		||||
        // perform rpc call to verify edgeId in DeviceActorMessageProcessor updated properly
 | 
			
		||||
        doPostAsync(
 | 
			
		||||
                "/api/rpc/oneway/" + device.getId().getId().toString(),
 | 
			
		||||
                JacksonUtil.toString(createDefaultRpc()),
 | 
			
		||||
                String.class,
 | 
			
		||||
                status().isOk());
 | 
			
		||||
 | 
			
		||||
        final AtomicReference<PageData<EdgeEvent>> resultRef = new AtomicReference<>();
 | 
			
		||||
        Awaitility.await()
 | 
			
		||||
                .atMost(TIMEOUT, TimeUnit.SECONDS)
 | 
			
		||||
                .until(() -> {
 | 
			
		||||
                    PageData<EdgeEvent> result = edgeEventService.findEdgeEvents(tenantId, tmpEdge.getId(), 0L, null, new TimePageLink(1));
 | 
			
		||||
                    resultRef.set(result);
 | 
			
		||||
                    return result != null && result.getData().size() == 1;
 | 
			
		||||
                });
 | 
			
		||||
 | 
			
		||||
        PageData<EdgeEvent> result = resultRef.get();
 | 
			
		||||
        EdgeEvent edgeEvent = result.getData().get(0);
 | 
			
		||||
        Assert.assertEquals(EdgeEventActionType.RPC_CALL, edgeEvent.getAction());
 | 
			
		||||
        Assert.assertEquals(EdgeEventType.DEVICE, edgeEvent.getType());
 | 
			
		||||
        Assert.assertEquals(tmpEdge.getId(), edgeEvent.getEdgeId());
 | 
			
		||||
        Assert.assertEquals(device.getId().getId(), edgeEvent.getEntityId());
 | 
			
		||||
 | 
			
		||||
        // clean up tmp edge
 | 
			
		||||
        doDelete("/api/edge/" + tmpEdge.getId().getId().toString()).andExpect(status().isOk());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Device buildDeviceForUplinkMsg(String name, String type) {
 | 
			
		||||
        Device device = new Device();
 | 
			
		||||
        device.setId(new DeviceId(UUID.randomUUID()));
 | 
			
		||||
@ -778,7 +834,6 @@ public class DeviceEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        return device;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    private DeviceCredentials buildDeviceCredentialsForUplinkMsg(DeviceId deviceId) {
 | 
			
		||||
        DeviceCredentials deviceCredentials = new DeviceCredentials();
 | 
			
		||||
        deviceCredentials.setDeviceId(deviceId);
 | 
			
		||||
@ -786,4 +841,20 @@ public class DeviceEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN);
 | 
			
		||||
        return deviceCredentials;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ObjectNode createDefaultRpc() {
 | 
			
		||||
        ObjectNode rpc = JacksonUtil.newObjectNode();
 | 
			
		||||
        rpc.put("method", "setGpio");
 | 
			
		||||
 | 
			
		||||
        ObjectNode params = JacksonUtil.newObjectNode();
 | 
			
		||||
 | 
			
		||||
        params.put("pin", 7);
 | 
			
		||||
        params.put("value", 1);
 | 
			
		||||
 | 
			
		||||
        rpc.set("params", params);
 | 
			
		||||
        rpc.put("persistent", true);
 | 
			
		||||
        rpc.put("timeout", 5000);
 | 
			
		||||
 | 
			
		||||
        return rpc;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -29,11 +29,12 @@ import org.thingsboard.server.common.data.id.QueueId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.queue.Queue;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
			
		||||
import org.thingsboard.server.dao.edge.EdgeService;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TopicService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TopicService;
 | 
			
		||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
 | 
			
		||||
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
 | 
			
		||||
import org.thingsboard.server.service.gateway_device.GatewayNotificationsService;
 | 
			
		||||
@ -73,6 +74,8 @@ public class DefaultTbClusterServiceTest {
 | 
			
		||||
    @MockBean
 | 
			
		||||
    protected GatewayNotificationsService gatewayNotificationsService;
 | 
			
		||||
    @MockBean
 | 
			
		||||
    protected EdgeService edgeService;
 | 
			
		||||
    @MockBean
 | 
			
		||||
    protected PartitionService partitionService;
 | 
			
		||||
    @MockBean
 | 
			
		||||
    protected TbQueueProducerProvider producerProvider;
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user