send resourse change/delete events only for lwm2m
This commit is contained in:
		
							parent
							
								
									4249ad8934
								
							
						
					
					
						commit
						1d79f1e213
					
				@ -24,12 +24,14 @@ import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.cluster.TbClusterService;
 | 
			
		||||
import org.thingsboard.server.common.data.ApiUsageState;
 | 
			
		||||
import org.thingsboard.server.common.data.DataConstants;
 | 
			
		||||
import org.thingsboard.server.common.data.Device;
 | 
			
		||||
import org.thingsboard.server.common.data.DeviceProfile;
 | 
			
		||||
import org.thingsboard.server.common.data.EdgeUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.HasName;
 | 
			
		||||
import org.thingsboard.server.common.data.HasRuleEngineProfile;
 | 
			
		||||
import org.thingsboard.server.common.data.ResourceType;
 | 
			
		||||
import org.thingsboard.server.common.data.TbResourceInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.Tenant;
 | 
			
		||||
import org.thingsboard.server.common.data.TenantProfile;
 | 
			
		||||
@ -340,29 +342,33 @@ public class DefaultTbClusterService implements TbClusterService {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onResourceChange(TbResourceInfo resource, TbQueueCallback callback) {
 | 
			
		||||
        TenantId tenantId = resource.getTenantId();
 | 
			
		||||
        log.trace("[{}][{}][{}] Processing change resource", tenantId, resource.getResourceType(), resource.getResourceKey());
 | 
			
		||||
        TransportProtos.ResourceUpdateMsg resourceUpdateMsg = TransportProtos.ResourceUpdateMsg.newBuilder()
 | 
			
		||||
                .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
 | 
			
		||||
                .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
 | 
			
		||||
                .setResourceType(resource.getResourceType().name())
 | 
			
		||||
                .setResourceKey(resource.getResourceKey())
 | 
			
		||||
                .build();
 | 
			
		||||
        ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setResourceUpdateMsg(resourceUpdateMsg).build();
 | 
			
		||||
        broadcast(transportMsg, callback);
 | 
			
		||||
        if (resource.getResourceType() == ResourceType.LWM2M_MODEL) {
 | 
			
		||||
            TenantId tenantId = resource.getTenantId();
 | 
			
		||||
            log.trace("[{}][{}][{}] Processing change resource", tenantId, resource.getResourceType(), resource.getResourceKey());
 | 
			
		||||
            TransportProtos.ResourceUpdateMsg resourceUpdateMsg = TransportProtos.ResourceUpdateMsg.newBuilder()
 | 
			
		||||
                    .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
 | 
			
		||||
                    .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
 | 
			
		||||
                    .setResourceType(resource.getResourceType().name())
 | 
			
		||||
                    .setResourceKey(resource.getResourceKey())
 | 
			
		||||
                    .build();
 | 
			
		||||
            ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setResourceUpdateMsg(resourceUpdateMsg).build();
 | 
			
		||||
            broadcast(transportMsg, DataConstants.LWM2M_TRANSPORT_NAME, callback);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onResourceDeleted(TbResourceInfo resource, TbQueueCallback callback) {
 | 
			
		||||
        log.trace("[{}] Processing delete resource", resource);
 | 
			
		||||
        TransportProtos.ResourceDeleteMsg resourceUpdateMsg = TransportProtos.ResourceDeleteMsg.newBuilder()
 | 
			
		||||
                .setTenantIdMSB(resource.getTenantId().getId().getMostSignificantBits())
 | 
			
		||||
                .setTenantIdLSB(resource.getTenantId().getId().getLeastSignificantBits())
 | 
			
		||||
                .setResourceType(resource.getResourceType().name())
 | 
			
		||||
                .setResourceKey(resource.getResourceKey())
 | 
			
		||||
                .build();
 | 
			
		||||
        ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setResourceDeleteMsg(resourceUpdateMsg).build();
 | 
			
		||||
        broadcast(transportMsg, callback);
 | 
			
		||||
        if (resource.getResourceType() == ResourceType.LWM2M_MODEL) {
 | 
			
		||||
            log.trace("[{}] Processing delete resource", resource);
 | 
			
		||||
            TransportProtos.ResourceDeleteMsg resourceUpdateMsg = TransportProtos.ResourceDeleteMsg.newBuilder()
 | 
			
		||||
                    .setTenantIdMSB(resource.getTenantId().getId().getMostSignificantBits())
 | 
			
		||||
                    .setTenantIdLSB(resource.getTenantId().getId().getLeastSignificantBits())
 | 
			
		||||
                    .setResourceType(resource.getResourceType().name())
 | 
			
		||||
                    .setResourceKey(resource.getResourceKey())
 | 
			
		||||
                    .build();
 | 
			
		||||
            ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setResourceDeleteMsg(resourceUpdateMsg).build();
 | 
			
		||||
            broadcast(transportMsg, DataConstants.LWM2M_TRANSPORT_NAME, callback);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private <T> void broadcastEntityChangeToTransport(TenantId tenantId, EntityId entityid, T entity, TbQueueCallback callback) {
 | 
			
		||||
@ -394,6 +400,20 @@ public class DefaultTbClusterService implements TbClusterService {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void broadcast(ToTransportMsg transportMsg, String transportType, TbQueueCallback callback) {
 | 
			
		||||
        TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer();
 | 
			
		||||
        Set<TransportProtos.ServiceInfo> tbTransportInfos = partitionService.getAllServices(ServiceType.TB_TRANSPORT);
 | 
			
		||||
        TbQueueCallback proxyCallback = callback != null ? new MultipleTbQueueCallbackWrapper(tbTransportInfos.size(), callback) : null;
 | 
			
		||||
        tbTransportInfos.stream()
 | 
			
		||||
                .filter(info -> info.getTransportsList().contains(transportType))
 | 
			
		||||
                .map(TransportProtos.ServiceInfo::getServiceId)
 | 
			
		||||
                .forEach(transportServiceId -> {
 | 
			
		||||
                    TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportServiceId);
 | 
			
		||||
                    toTransportNfProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), proxyCallback);
 | 
			
		||||
                    toTransportNfs.incrementAndGet();
 | 
			
		||||
                });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId) {
 | 
			
		||||
        log.trace("[{}] Processing edge {} event update ", tenantId, edgeId);
 | 
			
		||||
 | 
			
		||||
@ -931,7 +931,7 @@ public class DefaultTransportService extends TransportActivityManager implements
 | 
			
		||||
                    rateLimitService.remove(new DeviceId(entityUuid));
 | 
			
		||||
                    onDeviceDeleted(new DeviceId(entityUuid));
 | 
			
		||||
                }
 | 
			
		||||
            } else if (toSessionMsg.hasResourceUpdateMsg()) {
 | 
			
		||||
            } else if (toSessionMsg.hasResourceUpdateMsg() && serviceInfoProvider.getServiceInfo().getTransportsList().contains(DataConstants.LWM2M_TRANSPORT_NAME)) {
 | 
			
		||||
                TransportProtos.ResourceUpdateMsg msg = toSessionMsg.getResourceUpdateMsg();
 | 
			
		||||
                TenantId tenantId = TenantId.fromUUID(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB()));
 | 
			
		||||
                ResourceType resourceType = ResourceType.valueOf(msg.getResourceType());
 | 
			
		||||
@ -941,14 +941,14 @@ public class DefaultTransportService extends TransportActivityManager implements
 | 
			
		||||
                    log.trace("ResourceUpdate - [{}] [{}]", id, mdRez);
 | 
			
		||||
                    transportCallbackExecutor.submit(() -> mdRez.getListener().onResourceUpdate(msg));
 | 
			
		||||
                });
 | 
			
		||||
            } else if (toSessionMsg.hasResourceDeleteMsg()) {
 | 
			
		||||
            } else if (toSessionMsg.hasResourceDeleteMsg() && serviceInfoProvider.getServiceInfo().getTransportsList().contains(DataConstants.LWM2M_TRANSPORT_NAME)) {
 | 
			
		||||
                TransportProtos.ResourceDeleteMsg msg = toSessionMsg.getResourceDeleteMsg();
 | 
			
		||||
                TenantId tenantId = TenantId.fromUUID(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB()));
 | 
			
		||||
                ResourceType resourceType = ResourceType.valueOf(msg.getResourceType());
 | 
			
		||||
                String resourceId = msg.getResourceKey();
 | 
			
		||||
                transportResourceCache.evict(tenantId, resourceType, resourceId);
 | 
			
		||||
                sessions.forEach((id, mdRez) -> {
 | 
			
		||||
                    log.warn("ResourceDelete - [{}] [{}]", id, mdRez);
 | 
			
		||||
                    log.trace("ResourceDelete - [{}] [{}]", id, mdRez);
 | 
			
		||||
                    transportCallbackExecutor.submit(() -> mdRez.getListener().onResourceDelete(msg));
 | 
			
		||||
                });
 | 
			
		||||
            } else if (toSessionMsg.getQueueUpdateMsgsCount() > 0) {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user