Merge remote-tracking branch 'remotes/origin/master' into feature/api-limits
This commit is contained in:
		
						commit
						dbd4d2dd9d
					
				@ -66,6 +66,8 @@ public class CassandraDatabaseUpgradeService implements DatabaseUpgradeService {
 | 
			
		||||
 | 
			
		||||
                //Dump devices, assets and relations
 | 
			
		||||
 | 
			
		||||
                cluster.getSession();
 | 
			
		||||
 | 
			
		||||
                KeyspaceMetadata ks = cluster.getCluster().getMetadata().getKeyspace(cluster.getKeyspaceName());
 | 
			
		||||
 | 
			
		||||
                log.info("Dumping devices ...");
 | 
			
		||||
 | 
			
		||||
@ -77,6 +77,8 @@ http:
 | 
			
		||||
 | 
			
		||||
# MQTT server parameters
 | 
			
		||||
mqtt:
 | 
			
		||||
  # Enable/disable mqtt transport protocol.
 | 
			
		||||
  enabled: "${MQTT_ENABLED:true}"
 | 
			
		||||
  bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}"
 | 
			
		||||
  bind_port: "${MQTT_BIND_PORT:1883}"
 | 
			
		||||
  adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}"
 | 
			
		||||
@ -102,6 +104,8 @@ mqtt:
 | 
			
		||||
 | 
			
		||||
# CoAP server parameters
 | 
			
		||||
coap:
 | 
			
		||||
  # Enable/disable coap transport protocol.
 | 
			
		||||
  enabled: "${COAP_ENABLED:true}"
 | 
			
		||||
  bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}"
 | 
			
		||||
  bind_port: "${COAP_BIND_PORT:5683}"
 | 
			
		||||
  adaptor:  "${COAP_ADAPTOR_NAME:JsonCoapAdaptor}"
 | 
			
		||||
@ -229,6 +233,18 @@ cache:
 | 
			
		||||
      policy: "${CACHE_DEVICE_CREDENTIAL_MAX_SIZE_POLICY:PER_NODE}"
 | 
			
		||||
      size: "${CACHE_DEVICE_CREDENTIAL_MAX_SIZE_SIZE:1000000}"
 | 
			
		||||
 | 
			
		||||
caching:
 | 
			
		||||
  specs:
 | 
			
		||||
    relations:
 | 
			
		||||
      timeToLiveInMinutes: 1440
 | 
			
		||||
      maxSize: 100000
 | 
			
		||||
    deviceCredentials:
 | 
			
		||||
      timeToLiveInMinutes: 1440
 | 
			
		||||
      maxSize: 100000
 | 
			
		||||
    devices:
 | 
			
		||||
      timeToLiveInMinutes: 1440
 | 
			
		||||
      maxSize: 100000
 | 
			
		||||
 | 
			
		||||
# Check new version updates parameters
 | 
			
		||||
updates:
 | 
			
		||||
  # Enable/disable updates checking.
 | 
			
		||||
 | 
			
		||||
@ -17,4 +17,6 @@ package org.thingsboard.server.common.data;
 | 
			
		||||
 | 
			
		||||
public class CacheConstants {
 | 
			
		||||
    public static final String DEVICE_CREDENTIALS_CACHE = "deviceCredentials";
 | 
			
		||||
    public static final String RELATIONS_CACHE = "relations";
 | 
			
		||||
    public static final String DEVICE_CACHE = "devices";
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -148,6 +148,10 @@
 | 
			
		||||
            <groupId>com.hazelcast</groupId>
 | 
			
		||||
            <artifactId>hazelcast</artifactId>
 | 
			
		||||
        </dependency>
 | 
			
		||||
        <dependency>
 | 
			
		||||
            <groupId>com.github.ben-manes.caffeine</groupId>
 | 
			
		||||
            <artifactId>caffeine</artifactId>
 | 
			
		||||
        </dependency>
 | 
			
		||||
        <dependency>
 | 
			
		||||
            <groupId>com.hazelcast</groupId>
 | 
			
		||||
            <artifactId>hazelcast-spring</artifactId>
 | 
			
		||||
@ -174,6 +178,10 @@
 | 
			
		||||
            <artifactId>hsqldb</artifactId>
 | 
			
		||||
            <scope>test</scope>
 | 
			
		||||
        </dependency>
 | 
			
		||||
        <dependency>
 | 
			
		||||
            <groupId>org.springframework</groupId>
 | 
			
		||||
            <artifactId>spring-context-support</artifactId>
 | 
			
		||||
        </dependency>
 | 
			
		||||
    </dependencies>
 | 
			
		||||
    <build>
 | 
			
		||||
        <plugins>
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										24
									
								
								dao/src/main/java/org/thingsboard/server/dao/cache/CacheSpecs.java
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										24
									
								
								dao/src/main/java/org/thingsboard/server/dao/cache/CacheSpecs.java
									
									
									
									
										vendored
									
									
										Normal file
									
								
							@ -0,0 +1,24 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2017 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.cache;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
public class CacheSpecs {
 | 
			
		||||
    private Integer timeToLiveInMinutes;
 | 
			
		||||
    private Integer maxSize;
 | 
			
		||||
}
 | 
			
		||||
@ -23,6 +23,8 @@ import java.lang.reflect.Method;
 | 
			
		||||
 | 
			
		||||
public class PreviousDeviceCredentialsIdKeyGenerator implements KeyGenerator {
 | 
			
		||||
 | 
			
		||||
    private static final String NOT_VALID_DEVICE = "notValidDeviceCredentialsId";
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public Object generate(Object o, Method method, Object... objects) {
 | 
			
		||||
        DeviceCredentialsService deviceCredentialsService = (DeviceCredentialsService) o;
 | 
			
		||||
@ -33,6 +35,6 @@ public class PreviousDeviceCredentialsIdKeyGenerator implements KeyGenerator {
 | 
			
		||||
                return oldDeviceCredentials.getCredentialsId();
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return null;
 | 
			
		||||
        return NOT_VALID_DEVICE;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -15,76 +15,57 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.cache;
 | 
			
		||||
 | 
			
		||||
import com.hazelcast.config.*;
 | 
			
		||||
import com.hazelcast.core.Hazelcast;
 | 
			
		||||
import com.hazelcast.core.HazelcastInstance;
 | 
			
		||||
import com.hazelcast.instance.GroupProperty;
 | 
			
		||||
import com.hazelcast.spring.cache.HazelcastCacheManager;
 | 
			
		||||
import com.hazelcast.zookeeper.ZookeeperDiscoveryProperties;
 | 
			
		||||
import com.hazelcast.zookeeper.ZookeeperDiscoveryStrategyFactory;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 | 
			
		||||
import com.github.benmanes.caffeine.cache.Caffeine;
 | 
			
		||||
import com.github.benmanes.caffeine.cache.Ticker;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.springframework.boot.context.properties.ConfigurationProperties;
 | 
			
		||||
import org.springframework.cache.CacheManager;
 | 
			
		||||
import org.springframework.cache.annotation.EnableCaching;
 | 
			
		||||
import org.springframework.cache.caffeine.CaffeineCache;
 | 
			
		||||
import org.springframework.cache.interceptor.KeyGenerator;
 | 
			
		||||
import org.springframework.cache.support.SimpleCacheManager;
 | 
			
		||||
import org.springframework.context.annotation.Bean;
 | 
			
		||||
import org.springframework.context.annotation.Configuration;
 | 
			
		||||
import org.thingsboard.server.common.data.CacheConstants;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
@Configuration
 | 
			
		||||
@ConfigurationProperties(prefix = "caching")
 | 
			
		||||
@EnableCaching
 | 
			
		||||
@ConditionalOnProperty(prefix = "cache", value = "enabled", havingValue = "true")
 | 
			
		||||
@Data
 | 
			
		||||
public class ServiceCacheConfiguration {
 | 
			
		||||
 | 
			
		||||
    private static final String HAZELCAST_CLUSTER_NAME = "hazelcast";
 | 
			
		||||
 | 
			
		||||
    @Value("${cache.device_credentials.max_size.size}")
 | 
			
		||||
    private Integer cacheDeviceCredentialsMaxSizeSize;
 | 
			
		||||
    @Value("${cache.device_credentials.max_size.policy}")
 | 
			
		||||
    private String cacheDeviceCredentialsMaxSizePolicy;
 | 
			
		||||
    @Value("${cache.device_credentials.time_to_live}")
 | 
			
		||||
    private Integer cacheDeviceCredentialsTTL;
 | 
			
		||||
 | 
			
		||||
    @Value("${zk.enabled}")
 | 
			
		||||
    private boolean zkEnabled;
 | 
			
		||||
    @Value("${zk.url}")
 | 
			
		||||
    private String zkUrl;
 | 
			
		||||
    @Value("${zk.zk_dir}")
 | 
			
		||||
    private String zkDir;
 | 
			
		||||
    private Map<String, CacheSpecs> specs;
 | 
			
		||||
 | 
			
		||||
    @Bean
 | 
			
		||||
    public HazelcastInstance hazelcastInstance() {
 | 
			
		||||
        Config config = new Config();
 | 
			
		||||
 | 
			
		||||
        if (zkEnabled) {
 | 
			
		||||
            addZkConfig(config);
 | 
			
		||||
    public CacheManager cacheManager() {
 | 
			
		||||
        SimpleCacheManager manager = new SimpleCacheManager();
 | 
			
		||||
        if (specs != null) {
 | 
			
		||||
            List<CaffeineCache> caches =
 | 
			
		||||
                    specs.entrySet().stream()
 | 
			
		||||
                            .map(entry -> buildCache(entry.getKey(),
 | 
			
		||||
                                    entry.getValue()))
 | 
			
		||||
                            .collect(Collectors.toList());
 | 
			
		||||
            manager.setCaches(caches);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        config.addMapConfig(createDeviceCredentialsCacheConfig());
 | 
			
		||||
 | 
			
		||||
        return Hazelcast.newHazelcastInstance(config);
 | 
			
		||||
        return manager;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void addZkConfig(Config config) {
 | 
			
		||||
        config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
 | 
			
		||||
        config.setProperty(GroupProperty.DISCOVERY_SPI_ENABLED.getName(), Boolean.TRUE.toString());
 | 
			
		||||
        DiscoveryStrategyConfig discoveryStrategyConfig = new DiscoveryStrategyConfig(new ZookeeperDiscoveryStrategyFactory());
 | 
			
		||||
        discoveryStrategyConfig.addProperty(ZookeeperDiscoveryProperties.ZOOKEEPER_URL.key(), zkUrl);
 | 
			
		||||
        discoveryStrategyConfig.addProperty(ZookeeperDiscoveryProperties.ZOOKEEPER_PATH.key(), zkDir);
 | 
			
		||||
        discoveryStrategyConfig.addProperty(ZookeeperDiscoveryProperties.GROUP.key(), HAZELCAST_CLUSTER_NAME);
 | 
			
		||||
        config.getNetworkConfig().getJoin().getDiscoveryConfig().addDiscoveryStrategyConfig(discoveryStrategyConfig);
 | 
			
		||||
    private CaffeineCache buildCache(String name, CacheSpecs cacheSpec) {
 | 
			
		||||
        final Caffeine<Object, Object> caffeineBuilder
 | 
			
		||||
                = Caffeine.newBuilder()
 | 
			
		||||
                .expireAfterWrite(cacheSpec.getTimeToLiveInMinutes(), TimeUnit.MINUTES)
 | 
			
		||||
                .maximumSize(cacheSpec.getMaxSize())
 | 
			
		||||
                .ticker(ticker());
 | 
			
		||||
        return new CaffeineCache(name, caffeineBuilder.build());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private MapConfig createDeviceCredentialsCacheConfig() {
 | 
			
		||||
        MapConfig deviceCredentialsCacheConfig = new MapConfig(CacheConstants.DEVICE_CREDENTIALS_CACHE);
 | 
			
		||||
        deviceCredentialsCacheConfig.setTimeToLiveSeconds(cacheDeviceCredentialsTTL);
 | 
			
		||||
        deviceCredentialsCacheConfig.setEvictionPolicy(EvictionPolicy.LRU);
 | 
			
		||||
        deviceCredentialsCacheConfig.setMaxSizeConfig(
 | 
			
		||||
                new MaxSizeConfig(
 | 
			
		||||
                        cacheDeviceCredentialsMaxSizeSize,
 | 
			
		||||
                        MaxSizeConfig.MaxSizePolicy.valueOf(cacheDeviceCredentialsMaxSizePolicy))
 | 
			
		||||
        );
 | 
			
		||||
        return deviceCredentialsCacheConfig;
 | 
			
		||||
    @Bean
 | 
			
		||||
    public Ticker ticker() {
 | 
			
		||||
        return Ticker.systemTicker();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Bean
 | 
			
		||||
@ -92,8 +73,4 @@ public class ServiceCacheConfiguration {
 | 
			
		||||
        return new PreviousDeviceCredentialsIdKeyGenerator();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Bean
 | 
			
		||||
    public CacheManager cacheManager() {
 | 
			
		||||
        return new HazelcastCacheManager(hazelcastInstance());
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -34,7 +34,7 @@ public interface DeviceService {
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<Device> findDeviceByIdAsync(DeviceId deviceId);
 | 
			
		||||
 | 
			
		||||
    Optional<Device> findDeviceByTenantIdAndName(TenantId tenantId, String name);
 | 
			
		||||
    Device findDeviceByTenantIdAndName(TenantId tenantId, String name);
 | 
			
		||||
 | 
			
		||||
    Device saveDevice(Device device);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -22,6 +22,10 @@ import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.commons.lang3.RandomStringUtils;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.cache.Cache;
 | 
			
		||||
import org.springframework.cache.CacheManager;
 | 
			
		||||
import org.springframework.cache.annotation.CacheEvict;
 | 
			
		||||
import org.springframework.cache.annotation.Cacheable;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.springframework.util.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.*;
 | 
			
		||||
@ -33,12 +37,12 @@ import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.page.TextPageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.TextPageLink;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.EntityRelation;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
 | 
			
		||||
import org.thingsboard.server.common.data.security.DeviceCredentials;
 | 
			
		||||
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
 | 
			
		||||
import org.thingsboard.server.dao.customer.CustomerDao;
 | 
			
		||||
import org.thingsboard.server.dao.entity.AbstractEntityService;
 | 
			
		||||
import org.thingsboard.server.dao.exception.DataValidationException;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
 | 
			
		||||
import org.thingsboard.server.dao.service.DataValidator;
 | 
			
		||||
import org.thingsboard.server.dao.service.PaginatedRemover;
 | 
			
		||||
import org.thingsboard.server.dao.tenant.TenantDao;
 | 
			
		||||
@ -47,6 +51,7 @@ import javax.annotation.Nullable;
 | 
			
		||||
import java.util.*;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.server.common.data.CacheConstants.DEVICE_CACHE;
 | 
			
		||||
import static org.thingsboard.server.dao.DaoUtil.toUUIDs;
 | 
			
		||||
import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
 | 
			
		||||
import static org.thingsboard.server.dao.service.Validator.*;
 | 
			
		||||
@ -71,6 +76,9 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private DeviceCredentialsService deviceCredentialsService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private CacheManager cacheManager;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public Device findDeviceById(DeviceId deviceId) {
 | 
			
		||||
        log.trace("Executing findDeviceById [{}]", deviceId);
 | 
			
		||||
@ -85,18 +93,16 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
 | 
			
		||||
        return deviceDao.findByIdAsync(deviceId.getId());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Cacheable(cacheNames = DEVICE_CACHE, key = "{#tenantId, #name}")
 | 
			
		||||
    @Override
 | 
			
		||||
    public Optional<Device> findDeviceByTenantIdAndName(TenantId tenantId, String name) {
 | 
			
		||||
    public Device findDeviceByTenantIdAndName(TenantId tenantId, String name) {
 | 
			
		||||
        log.trace("Executing findDeviceByTenantIdAndName [{}][{}]", tenantId, name);
 | 
			
		||||
        validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
 | 
			
		||||
        Optional<Device> deviceOpt = deviceDao.findDeviceByTenantIdAndName(tenantId.getId(), name);
 | 
			
		||||
        if (deviceOpt.isPresent()) {
 | 
			
		||||
            return Optional.of(deviceOpt.get());
 | 
			
		||||
        } else {
 | 
			
		||||
            return Optional.empty();
 | 
			
		||||
        }
 | 
			
		||||
        return deviceOpt.orElse(null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @CacheEvict(cacheNames = DEVICE_CACHE, key = "{#device.tenantId, #device.name}")
 | 
			
		||||
    @Override
 | 
			
		||||
    public Device saveDevice(Device device) {
 | 
			
		||||
        log.trace("Executing saveDevice [{}]", device);
 | 
			
		||||
@ -129,12 +135,18 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
 | 
			
		||||
    @Override
 | 
			
		||||
    public void deleteDevice(DeviceId deviceId) {
 | 
			
		||||
        log.trace("Executing deleteDevice [{}]", deviceId);
 | 
			
		||||
        Cache cache = cacheManager.getCache(DEVICE_CACHE);
 | 
			
		||||
        validateId(deviceId, INCORRECT_DEVICE_ID + deviceId);
 | 
			
		||||
        DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(deviceId);
 | 
			
		||||
        if (deviceCredentials != null) {
 | 
			
		||||
            deviceCredentialsService.deleteDeviceCredentials(deviceCredentials);
 | 
			
		||||
        }
 | 
			
		||||
        deleteEntityRelations(deviceId);
 | 
			
		||||
        Device device = deviceDao.findById(deviceId.getId());
 | 
			
		||||
        List<Object> list = new ArrayList<>();
 | 
			
		||||
        list.add(device.getTenantId());
 | 
			
		||||
        list.add(device.getName());
 | 
			
		||||
        cache.evict(list);
 | 
			
		||||
        deviceDao.removeById(deviceId.getId());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -190,7 +202,7 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
 | 
			
		||||
        validateId(customerId, INCORRECT_CUSTOMER_ID + customerId);
 | 
			
		||||
        validateString(type, "Incorrect type " + type);
 | 
			
		||||
        validatePageLink(pageLink, INCORRECT_PAGE_LINK + pageLink);
 | 
			
		||||
        List<Device> devices =  deviceDao.findDevicesByTenantIdAndCustomerIdAndType(tenantId.getId(), customerId.getId(), type, pageLink);
 | 
			
		||||
        List<Device> devices = deviceDao.findDevicesByTenantIdAndCustomerIdAndType(tenantId.getId(), customerId.getId(), type, pageLink);
 | 
			
		||||
        return new TextPageData<>(devices, pageLink);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -244,10 +256,10 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
 | 
			
		||||
        validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
 | 
			
		||||
        ListenableFuture<List<EntitySubtype>> tenantDeviceTypes = deviceDao.findTenantDeviceTypesAsync(tenantId.getId());
 | 
			
		||||
        return Futures.transform(tenantDeviceTypes,
 | 
			
		||||
            (Function<List<EntitySubtype>, List<EntitySubtype>>) deviceTypes -> {
 | 
			
		||||
                deviceTypes.sort(Comparator.comparing(EntitySubtype::getType));
 | 
			
		||||
                return deviceTypes;
 | 
			
		||||
        });
 | 
			
		||||
                (Function<List<EntitySubtype>, List<EntitySubtype>>) deviceTypes -> {
 | 
			
		||||
                    deviceTypes.sort(Comparator.comparing(EntitySubtype::getType));
 | 
			
		||||
                    return deviceTypes;
 | 
			
		||||
                });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private DataValidator<Device> deviceValidator =
 | 
			
		||||
 | 
			
		||||
@ -21,6 +21,11 @@ import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.cache.Cache;
 | 
			
		||||
import org.springframework.cache.CacheManager;
 | 
			
		||||
import org.springframework.cache.annotation.CacheEvict;
 | 
			
		||||
import org.springframework.cache.annotation.Cacheable;
 | 
			
		||||
import org.springframework.cache.annotation.Caching;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.springframework.util.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
@ -34,6 +39,8 @@ import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
import java.util.function.BiConsumer;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.server.common.data.CacheConstants.RELATIONS_CACHE;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Created by ashvayka on 28.04.17.
 | 
			
		||||
 */
 | 
			
		||||
@ -47,6 +54,9 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private EntityService entityService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private CacheManager cacheManager;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Boolean> checkRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
 | 
			
		||||
        log.trace("Executing checkRelation [{}][{}][{}][{}]", from, to, relationType, typeGroup);
 | 
			
		||||
@ -54,6 +64,7 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
        return relationDao.checkRelation(from, to, relationType, typeGroup);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType}")
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<EntityRelation> getRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
 | 
			
		||||
        log.trace("Executing EntityRelation [{}][{}][{}][{}]", from, to, relationType, typeGroup);
 | 
			
		||||
@ -61,6 +72,12 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
        return relationDao.getRelation(from, to, relationType, typeGroup);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Caching(evict = {
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.from"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.to"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type}")
 | 
			
		||||
    })
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean saveRelation(EntityRelation relation) {
 | 
			
		||||
        log.trace("Executing saveRelation [{}]", relation);
 | 
			
		||||
@ -68,6 +85,12 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
        return relationDao.saveRelation(relation);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Caching(evict = {
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.from"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.to"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type}")
 | 
			
		||||
    })
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Boolean> saveRelationAsync(EntityRelation relation) {
 | 
			
		||||
        log.trace("Executing saveRelationAsync [{}]", relation);
 | 
			
		||||
@ -75,6 +98,13 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
        return relationDao.saveRelationAsync(relation);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Caching(evict = {
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.from"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.to"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type}")
 | 
			
		||||
    })
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean deleteRelation(EntityRelation relation) {
 | 
			
		||||
        log.trace("Executing deleteRelation [{}]", relation);
 | 
			
		||||
@ -82,6 +112,13 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
        return relationDao.deleteRelation(relation);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Caching(evict = {
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.from"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#relation.to"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type}")
 | 
			
		||||
    })
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Boolean> deleteRelationAsync(EntityRelation relation) {
 | 
			
		||||
        log.trace("Executing deleteRelationAsync [{}]", relation);
 | 
			
		||||
@ -89,6 +126,13 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
        return relationDao.deleteRelationAsync(relation);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Caching(evict = {
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#from"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#to"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType}")
 | 
			
		||||
    })
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean deleteRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
 | 
			
		||||
        log.trace("Executing deleteRelation [{}][{}][{}][{}]", from, to, relationType, typeGroup);
 | 
			
		||||
@ -96,6 +140,13 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
        return relationDao.deleteRelation(from, to, relationType, typeGroup);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Caching(evict = {
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#from"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "#to"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType}"),
 | 
			
		||||
            @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType}")
 | 
			
		||||
    })
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Boolean> deleteRelationAsync(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
 | 
			
		||||
        log.trace("Executing deleteRelationAsync [{}][{}][{}][{}]", from, to, relationType, typeGroup);
 | 
			
		||||
@ -105,23 +156,17 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean deleteEntityRelations(EntityId entity) {
 | 
			
		||||
        Cache cache = cacheManager.getCache(RELATIONS_CACHE);
 | 
			
		||||
        log.trace("Executing deleteEntityRelations [{}]", entity);
 | 
			
		||||
        validate(entity);
 | 
			
		||||
        List<ListenableFuture<List<EntityRelation>>> inboundRelationsList = new ArrayList<>();
 | 
			
		||||
        List<ListenableFuture<List<EntityRelation>>> inboundRelationsListTo = new ArrayList<>();
 | 
			
		||||
        for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {
 | 
			
		||||
            inboundRelationsList.add(relationDao.findAllByTo(entity, typeGroup));
 | 
			
		||||
            inboundRelationsListTo.add(relationDao.findAllByTo(entity, typeGroup));
 | 
			
		||||
        }
 | 
			
		||||
        ListenableFuture<List<List<EntityRelation>>> inboundRelations = Futures.allAsList(inboundRelationsList);
 | 
			
		||||
        ListenableFuture<List<Boolean>> inboundDeletions = Futures.transform(inboundRelations, new Function<List<List<EntityRelation>>, List<Boolean>>() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public List<Boolean> apply(List<List<EntityRelation>> relations) {
 | 
			
		||||
                List<Boolean> results = new ArrayList<>();
 | 
			
		||||
                for (List<EntityRelation> relationList : relations) {
 | 
			
		||||
                    relationList.stream().forEach(relation -> results.add(relationDao.deleteRelation(relation)));
 | 
			
		||||
                }
 | 
			
		||||
                return results;
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
        ListenableFuture<List<List<EntityRelation>>> inboundRelationsTo = Futures.allAsList(inboundRelationsListTo);
 | 
			
		||||
        ListenableFuture<List<Boolean>> inboundDeletions = Futures.transform(inboundRelationsTo, (List<List<EntityRelation>> relations) ->
 | 
			
		||||
                getBooleans(relations, cache, true));
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<Boolean> inboundFuture = Futures.transform(inboundDeletions, getListToBooleanFunction());
 | 
			
		||||
        boolean inboundDeleteResult = false;
 | 
			
		||||
        try {
 | 
			
		||||
@ -129,37 +174,105 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
        } catch (InterruptedException | ExecutionException e) {
 | 
			
		||||
            log.error("Error deleting entity inbound relations", e);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        List<ListenableFuture<List<EntityRelation>>> inboundRelationsListFrom = new ArrayList<>();
 | 
			
		||||
        for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {
 | 
			
		||||
            inboundRelationsListFrom.add(relationDao.findAllByFrom(entity, typeGroup));
 | 
			
		||||
        }
 | 
			
		||||
        ListenableFuture<List<List<EntityRelation>>> inboundRelationsFrom = Futures.allAsList(inboundRelationsListFrom);
 | 
			
		||||
        Futures.transform(inboundRelationsFrom, (Function<List<List<EntityRelation>>, List<Boolean>>) relations ->
 | 
			
		||||
                getBooleans(relations, cache, false));
 | 
			
		||||
 | 
			
		||||
        boolean outboundDeleteResult = relationDao.deleteOutboundRelations(entity);
 | 
			
		||||
        return inboundDeleteResult && outboundDeleteResult;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private List<Boolean> getBooleans(List<List<EntityRelation>> relations, Cache cache, boolean isRemove) {
 | 
			
		||||
        List<Boolean> results = new ArrayList<>();
 | 
			
		||||
        for (List<EntityRelation> relationList : relations) {
 | 
			
		||||
            relationList.stream().forEach(relation -> {
 | 
			
		||||
                checkFromDeleteSync(cache, results, relation, isRemove);
 | 
			
		||||
            });
 | 
			
		||||
        }
 | 
			
		||||
        return results;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void checkFromDeleteSync(Cache cache, List<Boolean> results, EntityRelation relation, boolean isRemove) {
 | 
			
		||||
        if (isRemove) {
 | 
			
		||||
            results.add(relationDao.deleteRelation(relation));
 | 
			
		||||
            cacheEviction(relation, relation.getTo(), cache);
 | 
			
		||||
        } else {
 | 
			
		||||
            cacheEviction(relation, relation.getFrom(), cache);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Boolean> deleteEntityRelationsAsync(EntityId entity) {
 | 
			
		||||
        Cache cache = cacheManager.getCache(RELATIONS_CACHE);
 | 
			
		||||
        log.trace("Executing deleteEntityRelationsAsync [{}]", entity);
 | 
			
		||||
        validate(entity);
 | 
			
		||||
        List<ListenableFuture<List<EntityRelation>>> inboundRelationsList = new ArrayList<>();
 | 
			
		||||
        List<ListenableFuture<List<EntityRelation>>> inboundRelationsListTo = new ArrayList<>();
 | 
			
		||||
        for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {
 | 
			
		||||
            inboundRelationsList.add(relationDao.findAllByTo(entity, typeGroup));
 | 
			
		||||
            inboundRelationsListTo.add(relationDao.findAllByTo(entity, typeGroup));
 | 
			
		||||
        }
 | 
			
		||||
        ListenableFuture<List<List<EntityRelation>>> inboundRelations = Futures.allAsList(inboundRelationsList);
 | 
			
		||||
        ListenableFuture<List<Boolean>> inboundDeletions = Futures.transform(inboundRelations, new AsyncFunction<List<List<EntityRelation>>, List<Boolean>>() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public ListenableFuture<List<Boolean>> apply(List<List<EntityRelation>> relations) throws Exception {
 | 
			
		||||
                List<ListenableFuture<Boolean>> results = new ArrayList<>();
 | 
			
		||||
                for (List<EntityRelation> relationList : relations) {
 | 
			
		||||
                    relationList.stream().forEach(relation -> results.add(relationDao.deleteRelationAsync(relation)));
 | 
			
		||||
                }
 | 
			
		||||
                return Futures.allAsList(results);
 | 
			
		||||
            }
 | 
			
		||||
        ListenableFuture<List<List<EntityRelation>>> inboundRelationsTo = Futures.allAsList(inboundRelationsListTo);
 | 
			
		||||
        ListenableFuture<List<Boolean>> inboundDeletions = Futures.transform(inboundRelationsTo,
 | 
			
		||||
                (AsyncFunction<List<List<EntityRelation>>, List<Boolean>>) relations -> {
 | 
			
		||||
            List<ListenableFuture<Boolean>> results = getListenableFutures(relations, cache, true);
 | 
			
		||||
            return Futures.allAsList(results);
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<Boolean> inboundFuture = Futures.transform(inboundDeletions, getListToBooleanFunction());
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<Boolean> outboundFuture = relationDao.deleteOutboundRelationsAsync(entity);
 | 
			
		||||
        List<ListenableFuture<List<EntityRelation>>> inboundRelationsListFrom = new ArrayList<>();
 | 
			
		||||
        for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {
 | 
			
		||||
            inboundRelationsListFrom.add(relationDao.findAllByTo(entity, typeGroup));
 | 
			
		||||
        }
 | 
			
		||||
        ListenableFuture<List<List<EntityRelation>>> inboundRelationsFrom = Futures.allAsList(inboundRelationsListFrom);
 | 
			
		||||
        Futures.transform(inboundRelationsFrom, (AsyncFunction<List<List<EntityRelation>>, List<Boolean>>) relations -> {
 | 
			
		||||
            List<ListenableFuture<Boolean>> results = getListenableFutures(relations, cache, false);
 | 
			
		||||
            return Futures.allAsList(results);
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<Boolean> outboundFuture = relationDao.deleteOutboundRelationsAsync(entity);
 | 
			
		||||
        return Futures.transform(Futures.allAsList(Arrays.asList(inboundFuture, outboundFuture)), getListToBooleanFunction());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private List<ListenableFuture<Boolean>> getListenableFutures(List<List<EntityRelation>> relations, Cache cache, boolean isRemove) {
 | 
			
		||||
        List<ListenableFuture<Boolean>> results = new ArrayList<>();
 | 
			
		||||
        for (List<EntityRelation> relationList : relations) {
 | 
			
		||||
            relationList.stream().forEach(relation -> {
 | 
			
		||||
                checkFromDeleteAsync(cache, results, relation, isRemove);
 | 
			
		||||
            });
 | 
			
		||||
        }
 | 
			
		||||
        return results;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void checkFromDeleteAsync(Cache cache, List<ListenableFuture<Boolean>> results, EntityRelation relation, boolean isRemove) {
 | 
			
		||||
        if (isRemove) {
 | 
			
		||||
            results.add(relationDao.deleteRelationAsync(relation));
 | 
			
		||||
            cacheEviction(relation, relation.getTo(), cache);
 | 
			
		||||
        } else {
 | 
			
		||||
            cacheEviction(relation, relation.getFrom(), cache);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void cacheEviction(EntityRelation relation, EntityId entityId, Cache cache) {
 | 
			
		||||
        cache.evict(entityId);
 | 
			
		||||
 | 
			
		||||
        List<Object> toAndType = new ArrayList<>();
 | 
			
		||||
        toAndType.add(entityId);
 | 
			
		||||
        toAndType.add(relation.getType());
 | 
			
		||||
        cache.evict(toAndType);
 | 
			
		||||
 | 
			
		||||
        List<Object> fromToAndType = new ArrayList<>();
 | 
			
		||||
        fromToAndType.add(relation.getFrom());
 | 
			
		||||
        fromToAndType.add(relation.getTo());
 | 
			
		||||
        fromToAndType.add(relation.getType());
 | 
			
		||||
        cache.evict(fromToAndType);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Cacheable(cacheNames = RELATIONS_CACHE, key = "#from")
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<List<EntityRelation>> findByFrom(EntityId from, RelationTypeGroup typeGroup) {
 | 
			
		||||
        log.trace("Executing findByFrom [{}][{}]", from, typeGroup);
 | 
			
		||||
@ -176,17 +289,18 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
        ListenableFuture<List<EntityRelation>> relations = relationDao.findAllByFrom(from, typeGroup);
 | 
			
		||||
        ListenableFuture<List<EntityRelationInfo>> relationsInfo = Futures.transform(relations,
 | 
			
		||||
                (AsyncFunction<List<EntityRelation>, List<EntityRelationInfo>>) relations1 -> {
 | 
			
		||||
            List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>();
 | 
			
		||||
                    List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>();
 | 
			
		||||
                    relations1.stream().forEach(relation ->
 | 
			
		||||
                            futures.add(fetchRelationInfoAsync(relation,
 | 
			
		||||
                                    relation2 -> relation2.getTo(),
 | 
			
		||||
                                    (EntityRelationInfo relationInfo, String entityName) -> relationInfo.setToName(entityName)))
 | 
			
		||||
                    );
 | 
			
		||||
                    return Futures.successfulAsList(futures);
 | 
			
		||||
        });
 | 
			
		||||
                });
 | 
			
		||||
        return relationsInfo;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType}")
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<List<EntityRelation>> findByFromAndType(EntityId from, String relationType, RelationTypeGroup typeGroup) {
 | 
			
		||||
        log.trace("Executing findByFromAndType [{}][{}][{}]", from, relationType, typeGroup);
 | 
			
		||||
@ -196,6 +310,7 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
        return relationDao.findAllByFromAndType(from, relationType, typeGroup);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Cacheable(cacheNames = RELATIONS_CACHE, key = "#to")
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<List<EntityRelation>> findByTo(EntityId to, RelationTypeGroup typeGroup) {
 | 
			
		||||
        log.trace("Executing findByTo [{}][{}]", to, typeGroup);
 | 
			
		||||
@ -214,9 +329,9 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
                (AsyncFunction<List<EntityRelation>, List<EntityRelationInfo>>) relations1 -> {
 | 
			
		||||
                    List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>();
 | 
			
		||||
                    relations1.stream().forEach(relation ->
 | 
			
		||||
                        futures.add(fetchRelationInfoAsync(relation,
 | 
			
		||||
                                relation2 -> relation2.getFrom(),
 | 
			
		||||
                                (EntityRelationInfo relationInfo, String entityName) -> relationInfo.setFromName(entityName)))
 | 
			
		||||
                            futures.add(fetchRelationInfoAsync(relation,
 | 
			
		||||
                                    relation2 -> relation2.getFrom(),
 | 
			
		||||
                                    (EntityRelationInfo relationInfo, String entityName) -> relationInfo.setFromName(entityName)))
 | 
			
		||||
                    );
 | 
			
		||||
                    return Futures.successfulAsList(futures);
 | 
			
		||||
                });
 | 
			
		||||
@ -236,6 +351,7 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
        return entityRelationInfo;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType}")
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<List<EntityRelation>> findByToAndType(EntityId to, String relationType, RelationTypeGroup typeGroup) {
 | 
			
		||||
        log.trace("Executing findByToAndType [{}][{}][{}]", to, relationType, typeGroup);
 | 
			
		||||
@ -417,5 +533,4 @@ public class BaseRelationService implements RelationService {
 | 
			
		||||
        }
 | 
			
		||||
        return relations;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -15,16 +15,14 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.service;
 | 
			
		||||
 | 
			
		||||
import com.hazelcast.core.HazelcastInstance;
 | 
			
		||||
import org.apache.commons.lang3.RandomStringUtils;
 | 
			
		||||
import org.junit.After;
 | 
			
		||||
import org.junit.Assert;
 | 
			
		||||
import org.junit.Before;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.springframework.aop.framework.Advised;
 | 
			
		||||
import org.springframework.aop.support.AopUtils;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.test.context.TestPropertySource;
 | 
			
		||||
import org.springframework.cache.CacheManager;
 | 
			
		||||
import org.springframework.test.util.ReflectionTestUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.CacheConstants;
 | 
			
		||||
import org.thingsboard.server.common.data.Device;
 | 
			
		||||
@ -40,7 +38,6 @@ import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
import static org.mockito.Mockito.*;
 | 
			
		||||
 | 
			
		||||
@TestPropertySource(properties = {"cache.enabled = true"})
 | 
			
		||||
public abstract class BaseDeviceCredentialsCacheTest extends AbstractServiceTest {
 | 
			
		||||
 | 
			
		||||
    private static final String CREDENTIALS_ID_1 = RandomStringUtils.randomAlphanumeric(20);
 | 
			
		||||
@ -53,7 +50,7 @@ public abstract class BaseDeviceCredentialsCacheTest extends AbstractServiceTest
 | 
			
		||||
    private DeviceService deviceService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private HazelcastInstance hazelcastInstance;
 | 
			
		||||
    private CacheManager cacheManager;
 | 
			
		||||
 | 
			
		||||
    private UUID deviceId = UUID.randomUUID();
 | 
			
		||||
 | 
			
		||||
@ -67,7 +64,7 @@ public abstract class BaseDeviceCredentialsCacheTest extends AbstractServiceTest
 | 
			
		||||
 | 
			
		||||
    @After
 | 
			
		||||
    public void cleanup() {
 | 
			
		||||
        hazelcastInstance.getMap(CacheConstants.DEVICE_CREDENTIALS_CACHE).evictAll();
 | 
			
		||||
        cacheManager.getCache(CacheConstants.DEVICE_CREDENTIALS_CACHE).clear();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
@ -77,7 +74,6 @@ public abstract class BaseDeviceCredentialsCacheTest extends AbstractServiceTest
 | 
			
		||||
        deviceCredentialsService.findDeviceCredentialsByCredentialsId(CREDENTIALS_ID_1);
 | 
			
		||||
        deviceCredentialsService.findDeviceCredentialsByCredentialsId(CREDENTIALS_ID_1);
 | 
			
		||||
 | 
			
		||||
        Assert.assertEquals(1, hazelcastInstance.getMap(CacheConstants.DEVICE_CREDENTIALS_CACHE).size());
 | 
			
		||||
        verify(deviceCredentialsDao, times(1)).findByCredentialsId(CREDENTIALS_ID_1);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -88,17 +84,13 @@ public abstract class BaseDeviceCredentialsCacheTest extends AbstractServiceTest
 | 
			
		||||
        deviceCredentialsService.findDeviceCredentialsByCredentialsId(CREDENTIALS_ID_1);
 | 
			
		||||
        deviceCredentialsService.findDeviceCredentialsByCredentialsId(CREDENTIALS_ID_1);
 | 
			
		||||
 | 
			
		||||
        Assert.assertEquals(1, hazelcastInstance.getMap(CacheConstants.DEVICE_CREDENTIALS_CACHE).size());
 | 
			
		||||
        verify(deviceCredentialsDao, times(1)).findByCredentialsId(CREDENTIALS_ID_1);
 | 
			
		||||
 | 
			
		||||
        deviceCredentialsService.deleteDeviceCredentials(createDummyDeviceCredentials(CREDENTIALS_ID_1, deviceId));
 | 
			
		||||
 | 
			
		||||
        Assert.assertEquals(0, hazelcastInstance.getMap(CacheConstants.DEVICE_CREDENTIALS_CACHE).size());
 | 
			
		||||
 | 
			
		||||
        deviceCredentialsService.findDeviceCredentialsByCredentialsId(CREDENTIALS_ID_1);
 | 
			
		||||
        deviceCredentialsService.findDeviceCredentialsByCredentialsId(CREDENTIALS_ID_1);
 | 
			
		||||
 | 
			
		||||
        Assert.assertEquals(1, hazelcastInstance.getMap(CacheConstants.DEVICE_CREDENTIALS_CACHE).size());
 | 
			
		||||
        verify(deviceCredentialsDao, times(2)).findByCredentialsId(CREDENTIALS_ID_1);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -109,7 +101,6 @@ public abstract class BaseDeviceCredentialsCacheTest extends AbstractServiceTest
 | 
			
		||||
        deviceCredentialsService.findDeviceCredentialsByCredentialsId(CREDENTIALS_ID_1);
 | 
			
		||||
        deviceCredentialsService.findDeviceCredentialsByCredentialsId(CREDENTIALS_ID_1);
 | 
			
		||||
 | 
			
		||||
        Assert.assertEquals(1, hazelcastInstance.getMap(CacheConstants.DEVICE_CREDENTIALS_CACHE).size());
 | 
			
		||||
        verify(deviceCredentialsDao, times(1)).findByCredentialsId(CREDENTIALS_ID_1);
 | 
			
		||||
 | 
			
		||||
        when(deviceCredentialsDao.findByDeviceId(deviceId)).thenReturn(createDummyDeviceCredentialsEntity(CREDENTIALS_ID_1));
 | 
			
		||||
@ -119,13 +110,11 @@ public abstract class BaseDeviceCredentialsCacheTest extends AbstractServiceTest
 | 
			
		||||
        when(deviceService.findDeviceById(new DeviceId(deviceId))).thenReturn(new Device());
 | 
			
		||||
 | 
			
		||||
        deviceCredentialsService.updateDeviceCredentials(createDummyDeviceCredentials(deviceCredentialsId, CREDENTIALS_ID_2, deviceId));
 | 
			
		||||
        Assert.assertEquals(0, hazelcastInstance.getMap(CacheConstants.DEVICE_CREDENTIALS_CACHE).size());
 | 
			
		||||
 | 
			
		||||
        when(deviceCredentialsDao.findByCredentialsId(CREDENTIALS_ID_1)).thenReturn(null);
 | 
			
		||||
 | 
			
		||||
        deviceCredentialsService.findDeviceCredentialsByCredentialsId(CREDENTIALS_ID_1);
 | 
			
		||||
        deviceCredentialsService.findDeviceCredentialsByCredentialsId(CREDENTIALS_ID_1);
 | 
			
		||||
        Assert.assertEquals(0, hazelcastInstance.getMap(CacheConstants.DEVICE_CREDENTIALS_CACHE).size());
 | 
			
		||||
 | 
			
		||||
        verify(deviceCredentialsDao, times(3)).findByCredentialsId(CREDENTIALS_ID_1);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,101 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2017 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.service;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import org.junit.After;
 | 
			
		||||
import org.junit.Before;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.springframework.aop.framework.Advised;
 | 
			
		||||
import org.springframework.aop.support.AopUtils;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.cache.CacheManager;
 | 
			
		||||
import org.springframework.test.util.ReflectionTestUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.EntityRelation;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
 | 
			
		||||
import org.thingsboard.server.dao.relation.RelationDao;
 | 
			
		||||
import org.thingsboard.server.dao.relation.RelationService;
 | 
			
		||||
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
 | 
			
		||||
import static org.mockito.Mockito.*;
 | 
			
		||||
import static org.thingsboard.server.common.data.CacheConstants.RELATIONS_CACHE;
 | 
			
		||||
 | 
			
		||||
public abstract class BaseRelationCacheTest extends AbstractServiceTest {
 | 
			
		||||
 | 
			
		||||
    private static final EntityId ENTITY_ID_FROM = new DeviceId(UUID.randomUUID());
 | 
			
		||||
    private static final EntityId ENTITY_ID_TO = new DeviceId(UUID.randomUUID());
 | 
			
		||||
    private static final String RELATION_TYPE = "Contains";
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private RelationService relationService;
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private CacheManager cacheManager;
 | 
			
		||||
 | 
			
		||||
    private RelationDao relationDao;
 | 
			
		||||
 | 
			
		||||
    @Before
 | 
			
		||||
    public void setup() throws Exception {
 | 
			
		||||
        relationDao = mock(RelationDao.class);
 | 
			
		||||
        ReflectionTestUtils.setField(unwrapRelationService(), "relationDao", relationDao);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @After
 | 
			
		||||
    public void cleanup() {
 | 
			
		||||
        cacheManager.getCache(RELATIONS_CACHE).clear();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private RelationService unwrapRelationService() throws Exception {
 | 
			
		||||
        if (AopUtils.isAopProxy(relationService) && relationService instanceof Advised) {
 | 
			
		||||
            Object target = ((Advised) relationService).getTargetSource().getTarget();
 | 
			
		||||
            return (RelationService) target;
 | 
			
		||||
        }
 | 
			
		||||
        return null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testFindRelationByFrom_Cached() throws ExecutionException, InterruptedException {
 | 
			
		||||
        when(relationDao.getRelation(ENTITY_ID_FROM, ENTITY_ID_TO, RELATION_TYPE, RelationTypeGroup.COMMON))
 | 
			
		||||
                .thenReturn(Futures.immediateFuture(new EntityRelation(ENTITY_ID_FROM, ENTITY_ID_TO, RELATION_TYPE)));
 | 
			
		||||
 | 
			
		||||
        relationService.getRelation(ENTITY_ID_FROM, ENTITY_ID_TO, RELATION_TYPE, RelationTypeGroup.COMMON);
 | 
			
		||||
        relationService.getRelation(ENTITY_ID_FROM, ENTITY_ID_TO, RELATION_TYPE, RelationTypeGroup.COMMON);
 | 
			
		||||
 | 
			
		||||
        verify(relationDao, times(1)).getRelation(ENTITY_ID_FROM, ENTITY_ID_TO, RELATION_TYPE, RelationTypeGroup.COMMON);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testDeleteRelations_EvictsCache() {
 | 
			
		||||
        when(relationDao.getRelation(ENTITY_ID_FROM, ENTITY_ID_TO, RELATION_TYPE, RelationTypeGroup.COMMON))
 | 
			
		||||
                .thenReturn(Futures.immediateFuture(new EntityRelation(ENTITY_ID_FROM, ENTITY_ID_TO, RELATION_TYPE)));
 | 
			
		||||
 | 
			
		||||
        relationService.getRelation(ENTITY_ID_FROM, ENTITY_ID_TO, RELATION_TYPE, RelationTypeGroup.COMMON);
 | 
			
		||||
        relationService.getRelation(ENTITY_ID_FROM, ENTITY_ID_TO, RELATION_TYPE, RelationTypeGroup.COMMON);
 | 
			
		||||
 | 
			
		||||
        verify(relationDao, times(1)).getRelation(ENTITY_ID_FROM, ENTITY_ID_TO, RELATION_TYPE, RelationTypeGroup.COMMON);
 | 
			
		||||
 | 
			
		||||
        relationService.deleteRelation(ENTITY_ID_FROM, ENTITY_ID_TO, RELATION_TYPE, RelationTypeGroup.COMMON);
 | 
			
		||||
 | 
			
		||||
        relationService.getRelation(ENTITY_ID_FROM, ENTITY_ID_TO, RELATION_TYPE, RelationTypeGroup.COMMON);
 | 
			
		||||
        relationService.getRelation(ENTITY_ID_FROM, ENTITY_ID_TO, RELATION_TYPE, RelationTypeGroup.COMMON);
 | 
			
		||||
 | 
			
		||||
        verify(relationDao, times(2)).getRelation(ENTITY_ID_FROM, ENTITY_ID_TO, RELATION_TYPE, RelationTypeGroup.COMMON);
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,23 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2017 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.service.nosql;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.dao.service.BaseRelationCacheTest;
 | 
			
		||||
import org.thingsboard.server.dao.service.DaoNoSqlTest;
 | 
			
		||||
 | 
			
		||||
@DaoNoSqlTest
 | 
			
		||||
public class RelationCacheNoSqlTest extends BaseRelationCacheTest {
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,23 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2017 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.service.sql;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.dao.service.BaseRelationCacheTest;
 | 
			
		||||
import org.thingsboard.server.dao.service.DaoSqlTest;
 | 
			
		||||
 | 
			
		||||
@DaoSqlTest
 | 
			
		||||
public class RelationCacheSqlTest extends BaseRelationCacheTest {
 | 
			
		||||
}
 | 
			
		||||
@ -7,4 +7,13 @@ zk.enabled=false
 | 
			
		||||
zk.url=localhost:2181
 | 
			
		||||
zk.zk_dir=/thingsboard
 | 
			
		||||
 | 
			
		||||
updates.enabled=false
 | 
			
		||||
updates.enabled=false
 | 
			
		||||
 | 
			
		||||
caching.specs.relations.timeToLiveInMinutes=1440
 | 
			
		||||
caching.specs.relations.maxSize=100000
 | 
			
		||||
 | 
			
		||||
caching.specs.deviceCredentials.timeToLiveInMinutes=1440
 | 
			
		||||
caching.specs.deviceCredentials.maxSize=100000
 | 
			
		||||
 | 
			
		||||
caching.specs.devices.timeToLiveInMinutes=1440
 | 
			
		||||
caching.specs.devices.maxSize=100000
 | 
			
		||||
@ -108,12 +108,12 @@ public class MailPlugin extends AbstractPlugin<MailPluginConfiguration> implemen
 | 
			
		||||
        MimeMessage mailMsg = mailSender.createMimeMessage();
 | 
			
		||||
        MimeMessageHelper helper = new MimeMessageHelper(mailMsg, "UTF-8");
 | 
			
		||||
        helper.setFrom(msg.getFrom());
 | 
			
		||||
        helper.setTo(msg.getTo());
 | 
			
		||||
        helper.setTo(msg.getTo().split("\\s*,\\s*"));
 | 
			
		||||
        if (!StringUtils.isEmpty(msg.getCc())) {
 | 
			
		||||
            helper.setCc(msg.getCc());
 | 
			
		||||
            helper.setCc(msg.getCc().split("\\s*,\\s*"));
 | 
			
		||||
        }
 | 
			
		||||
        if (!StringUtils.isEmpty(msg.getBcc())) {
 | 
			
		||||
            helper.setBcc(msg.getBcc());
 | 
			
		||||
            helper.setBcc(msg.getBcc().split("\\s*,\\s*"));
 | 
			
		||||
        }
 | 
			
		||||
        helper.setSubject(msg.getSubject());
 | 
			
		||||
        helper.setText(msg.getBody());
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										6
									
								
								pom.xml
									
									
									
									
									
								
							
							
						
						
									
										6
									
								
								pom.xml
									
									
									
									
									
								
							@ -43,6 +43,7 @@
 | 
			
		||||
        <cassandra-unit.version>3.0.0.1</cassandra-unit.version>
 | 
			
		||||
        <takari-cpsuite.version>1.2.7</takari-cpsuite.version>
 | 
			
		||||
        <guava.version>18.0</guava.version>
 | 
			
		||||
        <caffeine.version>2.6.1</caffeine.version>
 | 
			
		||||
        <commons-lang3.version>3.4</commons-lang3.version>
 | 
			
		||||
        <commons-validator.version>1.5.0</commons-validator.version>
 | 
			
		||||
        <commons-io.version>2.5</commons-io.version>
 | 
			
		||||
@ -644,6 +645,11 @@
 | 
			
		||||
                <artifactId>guava</artifactId>
 | 
			
		||||
                <version>${guava.version}</version>
 | 
			
		||||
            </dependency>
 | 
			
		||||
            <dependency>
 | 
			
		||||
                <groupId>com.github.ben-manes.caffeine</groupId>
 | 
			
		||||
                <artifactId>caffeine</artifactId>
 | 
			
		||||
                <version>${caffeine.version}</version>
 | 
			
		||||
            </dependency>
 | 
			
		||||
            <dependency>
 | 
			
		||||
                <groupId>com.google.protobuf</groupId>
 | 
			
		||||
                <artifactId>protobuf-java</artifactId>
 | 
			
		||||
 | 
			
		||||
@ -19,6 +19,10 @@ import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.eclipse.californium.core.CoapResource;
 | 
			
		||||
import org.eclipse.californium.core.CoapServer;
 | 
			
		||||
import org.eclipse.californium.core.network.CoapEndpoint;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 | 
			
		||||
import org.thingsboard.server.common.transport.SessionMsgProcessor;
 | 
			
		||||
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
 | 
			
		||||
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.context.ApplicationContext;
 | 
			
		||||
@ -35,6 +39,7 @@ import java.net.InetSocketAddress;
 | 
			
		||||
import java.net.UnknownHostException;
 | 
			
		||||
 | 
			
		||||
@Service("CoapTransportService")
 | 
			
		||||
@ConditionalOnProperty(prefix = "coap", value = "enabled", havingValue = "true", matchIfMissing = true)
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class CoapTransportService {
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -24,6 +24,7 @@ import io.netty.util.ResourceLeakDetector;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 | 
			
		||||
import org.springframework.context.ApplicationContext;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.common.transport.SessionMsgProcessor;
 | 
			
		||||
@ -40,6 +41,7 @@ import javax.annotation.PreDestroy;
 | 
			
		||||
 * @author Andrew Shvayka
 | 
			
		||||
 */
 | 
			
		||||
@Service("MqttTransportService")
 | 
			
		||||
@ConditionalOnProperty(prefix = "mqtt", value = "enabled", havingValue = "true", matchIfMissing = false)
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class MqttTransportService {
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -84,16 +84,15 @@ public class GatewaySessionCtx {
 | 
			
		||||
 | 
			
		||||
    private void onDeviceConnect(String deviceName, String deviceType) {
 | 
			
		||||
        if (!devices.containsKey(deviceName)) {
 | 
			
		||||
            Optional<Device> deviceOpt = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName);
 | 
			
		||||
            Device device = deviceOpt.orElseGet(() -> {
 | 
			
		||||
                Device newDevice = new Device();
 | 
			
		||||
                newDevice.setTenantId(gateway.getTenantId());
 | 
			
		||||
                newDevice.setName(deviceName);
 | 
			
		||||
                newDevice.setType(deviceType);
 | 
			
		||||
                newDevice = deviceService.saveDevice(newDevice);
 | 
			
		||||
                relationService.saveRelationAsync(new EntityRelation(gateway.getId(), newDevice.getId(), "Created"));
 | 
			
		||||
                return newDevice;
 | 
			
		||||
            });
 | 
			
		||||
            Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName);
 | 
			
		||||
            if (device == null) {
 | 
			
		||||
                device = new Device();
 | 
			
		||||
                device.setTenantId(gateway.getTenantId());
 | 
			
		||||
                device.setName(deviceName);
 | 
			
		||||
                device.setType(deviceType);
 | 
			
		||||
                device = deviceService.saveDevice(device);
 | 
			
		||||
                relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));
 | 
			
		||||
            }
 | 
			
		||||
            GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device);
 | 
			
		||||
            devices.put(deviceName, ctx);
 | 
			
		||||
            log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName);
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user