Added deduplication support for gateway
This commit is contained in:
		
							parent
							
								
									0bdf246383
								
							
						
					
					
						commit
						ba348dd997
					
				@ -1,12 +1,12 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2017 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * <p>
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * <p>
 | 
			
		||||
 * http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 * <p>
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
@ -72,17 +72,19 @@ public class GatewaySessionCtx {
 | 
			
		||||
 | 
			
		||||
    public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
 | 
			
		||||
        String deviceName = checkDeviceName(getDeviceName(msg));
 | 
			
		||||
        Optional<Device> deviceOpt = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName);
 | 
			
		||||
        Device device = deviceOpt.orElseGet(() -> {
 | 
			
		||||
            Device newDevice = new Device();
 | 
			
		||||
            newDevice.setTenantId(gateway.getTenantId());
 | 
			
		||||
            newDevice.setName(deviceName);
 | 
			
		||||
            return deviceService.saveDevice(newDevice);
 | 
			
		||||
        });
 | 
			
		||||
        GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device);
 | 
			
		||||
        devices.put(deviceName, ctx);
 | 
			
		||||
        processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
 | 
			
		||||
        processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
 | 
			
		||||
        if (!devices.containsKey(deviceName)) {
 | 
			
		||||
            Optional<Device> deviceOpt = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName);
 | 
			
		||||
            Device device = deviceOpt.orElseGet(() -> {
 | 
			
		||||
                Device newDevice = new Device();
 | 
			
		||||
                newDevice.setTenantId(gateway.getTenantId());
 | 
			
		||||
                newDevice.setName(deviceName);
 | 
			
		||||
                return deviceService.saveDevice(newDevice);
 | 
			
		||||
            });
 | 
			
		||||
            GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device);
 | 
			
		||||
            devices.put(deviceName, ctx);
 | 
			
		||||
            processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
 | 
			
		||||
            processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
 | 
			
		||||
        }
 | 
			
		||||
        ack(msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user