Asset Cache implementation

This commit is contained in:
Andrii Shvaika 2022-05-10 09:42:36 +03:00
parent eb6458738d
commit d9d68a0682
7 changed files with 207 additions and 38 deletions

View File

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.asset;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.thingsboard.server.common.data.id.TenantId;
@Data
@RequiredArgsConstructor
class AssetCacheEvictEvent {
private final TenantId tenantId;
private final String newName;
private final String oldName;
public AssetCacheEvictEvent(TenantId tenantId, String newName) {
this(tenantId, newName, null);
}
}

View File

@ -0,0 +1,42 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.asset;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.thingsboard.server.cache.CacheKeyUtil;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import java.io.Serializable;
@Getter
@EqualsAndHashCode
@RequiredArgsConstructor
@Builder
public class AssetCacheKey implements Serializable {
private final TenantId tenantId;
private final String name;
@Override
public String toString() {
return CacheKeyUtil.toString(tenantId, name);
}
}

View File

@ -0,0 +1,35 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.asset;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.CacheConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.dao.cache.CaffeineTbTransactionalCache;
import org.thingsboard.server.dao.device.DeviceCacheKey;
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true)
@Service("AssetCache")
public class AssetCaffeineCache extends CaffeineTbTransactionalCache<AssetCacheKey, Asset> {
public AssetCaffeineCache(CacheManager cacheManager) {
super(cacheManager, CacheConstants.ASSET_CACHE);
}
}

View File

@ -0,0 +1,51 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.asset;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import org.springframework.stereotype.Service;
import org.thingsboard.server.cache.CacheSpecsMap;
import org.thingsboard.server.cache.TBRedisCacheConfiguration;
import org.thingsboard.server.common.data.CacheConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.dao.cache.RedisTbTransactionalCache;
import org.thingsboard.server.dao.device.DeviceCacheKey;
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
@Service("AssetCache")
public class AssetRedisCache extends RedisTbTransactionalCache<AssetCacheKey, Asset> {
public AssetRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) {
super(CacheConstants.ASSET_CACHE, cacheSpecsMap, connectionFactory, configuration, new RedisSerializer<>() {
private final RedisSerializer<Object> java = RedisSerializer.java();
@Override
public byte[] serialize(Asset attributeKvEntry) throws SerializationException {
return java.serialize(attributeKvEntry);
}
@Override
public Asset deserialize(byte[] bytes) throws SerializationException {
return (Asset) java.deserialize(bytes);
}
});
}
}

View File

@ -22,12 +22,14 @@ import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.hibernate.exception.ConstraintViolationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionalEventListener;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.asset.AssetInfo;
import org.thingsboard.server.common.data.asset.AssetSearchQuery;
@ -42,8 +44,7 @@ import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.dao.cache.EntitiesCacheManager;
import org.thingsboard.server.dao.entity.AbstractEntityService;
import org.thingsboard.server.dao.entity.AbstractCachedEntityService;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.service.PaginatedRemover;
@ -55,7 +56,6 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.CacheConstants.ASSET_CACHE;
import static org.thingsboard.server.dao.DaoUtil.toUUIDs;
import static org.thingsboard.server.dao.service.Validator.validateId;
import static org.thingsboard.server.dao.service.Validator.validateIds;
@ -64,7 +64,7 @@ import static org.thingsboard.server.dao.service.Validator.validateString;
@Service
@Slf4j
public class BaseAssetService extends AbstractEntityService implements AssetService {
public class BaseAssetService extends AbstractCachedEntityService<AssetCacheKey, Asset, AssetCacheEvictEvent> implements AssetService {
public static final String INCORRECT_TENANT_ID = "Incorrect tenantId ";
public static final String INCORRECT_CUSTOMER_ID = "Incorrect customerId ";
@ -74,12 +74,20 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ
@Autowired
private AssetDao assetDao;
@Autowired
private EntitiesCacheManager cacheManager;
@Autowired
private DataValidator<Asset> assetValidator;
@TransactionalEventListener(classes = AssetCacheEvictEvent.class)
@Override
public void handleEvictEvent(AssetCacheEvictEvent event) {
List<AssetCacheKey> keys = new ArrayList<>(2);
keys.add(new AssetCacheKey(event.getTenantId(), event.getNewName()));
if (StringUtils.isNotEmpty(event.getOldName()) && !event.getOldName().equals(event.getNewName())) {
keys.add(new AssetCacheKey(event.getTenantId(), event.getOldName()));
}
cache.evict(keys);
}
@Override
public AssetInfo findAssetInfoById(TenantId tenantId, AssetId assetId) {
log.trace("Executing findAssetInfoById [{}]", assetId);
@ -101,23 +109,26 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ
return assetDao.findByIdAsync(tenantId, assetId.getId());
}
@Cacheable(cacheNames = ASSET_CACHE, key = "{#tenantId, #name}")
@Override
public Asset findAssetByTenantIdAndName(TenantId tenantId, String name) {
log.trace("Executing findAssetByTenantIdAndName [{}][{}]", tenantId, name);
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
return assetDao.findAssetsByTenantIdAndName(tenantId.getId(), name)
.orElse(null);
return cache.getAndPutInTransaction(new AssetCacheKey(tenantId, name),
() -> assetDao.findAssetsByTenantIdAndName(tenantId.getId(), name)
.orElse(null), true);
}
@CacheEvict(cacheNames = ASSET_CACHE, key = "{#asset.tenantId, #asset.name}")
@Transactional(propagation = Propagation.SUPPORTS)
@Override
public Asset saveAsset(Asset asset) {
log.trace("Executing saveAsset [{}]", asset);
assetValidator.validate(asset, Asset::getTenantId);
Asset oldAsset = assetValidator.validate(asset, Asset::getTenantId);
Asset savedAsset;
AssetCacheEvictEvent evictEvent = new AssetCacheEvictEvent(asset.getTenantId(), asset.getName(), oldAsset != null ? oldAsset.getName() : null);
try {
savedAsset = assetDao.save(asset.getTenantId(), asset);
publishEvictEvent(evictEvent);
} catch (Exception t) {
ConstraintViolationException e = extractConstraintViolationException(t).orElse(null);
if (e != null && e.getConstraintName() != null && e.getConstraintName().equalsIgnoreCase("asset_name_unq_key")) {
@ -129,6 +140,7 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ
return savedAsset;
}
@Transactional(propagation = Propagation.SUPPORTS)
@Override
public Asset assignAssetToCustomer(TenantId tenantId, AssetId assetId, CustomerId customerId) {
Asset asset = findAssetById(tenantId, assetId);
@ -136,6 +148,7 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ
return saveAsset(asset);
}
@Transactional(propagation = Propagation.SUPPORTS)
@Override
public Asset unassignAssetFromCustomer(TenantId tenantId, AssetId assetId) {
Asset asset = findAssetById(tenantId, assetId);
@ -143,6 +156,7 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ
return saveAsset(asset);
}
@Transactional(propagation = Propagation.SUPPORTS)
@Override
public void deleteAsset(TenantId tenantId, AssetId assetId) {
log.trace("Executing deleteAsset [{}]", assetId);
@ -160,7 +174,7 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ
throw new RuntimeException("Exception while finding entity views for assetId [" + assetId + "]", e);
}
cacheManager.removeAssetFromCacheByName(asset.getTenantId(), asset.getName());
publishEvictEvent(new AssetCacheEvictEvent(asset.getTenantId(), asset.getName(), null));
assetDao.removeById(tenantId, assetId.getId());
}

View File

@ -20,11 +20,11 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
@Data
public class DeviceEvent {
class DeviceCacheEvictEvent {
private final TenantId tenantId;
private final DeviceId deviceId;
private final String newDeviceName;
private final String oldDeviceName;
private final String newName;
private final String oldName;
}

View File

@ -23,15 +23,12 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.hibernate.exception.ConstraintViolationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionalEventListener;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.CollectionUtils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.cache.TbTransactionalCache;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceInfo;
import org.thingsboard.server.common.data.DeviceProfile;
@ -69,7 +66,6 @@ import org.thingsboard.server.dao.device.provision.ProvisionFailedException;
import org.thingsboard.server.dao.device.provision.ProvisionRequest;
import org.thingsboard.server.dao.device.provision.ProvisionResponseStatus;
import org.thingsboard.server.dao.entity.AbstractCachedEntityService;
import org.thingsboard.server.dao.entity.AbstractEntityService;
import org.thingsboard.server.dao.event.EventService;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.service.DataValidator;
@ -84,7 +80,6 @@ import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.CacheConstants.DEVICE_CACHE;
import static org.thingsboard.server.dao.DaoUtil.toUUIDs;
import static org.thingsboard.server.dao.service.Validator.validateId;
import static org.thingsboard.server.dao.service.Validator.validateIds;
@ -93,7 +88,7 @@ import static org.thingsboard.server.dao.service.Validator.validateString;
@Service
@Slf4j
public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKey, Device, DeviceEvent> implements DeviceService {
public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKey, Device, DeviceCacheEvictEvent> implements DeviceService {
public static final String INCORRECT_TENANT_ID = "Incorrect tenantId ";
public static final String INCORRECT_DEVICE_PROFILE_ID = "Incorrect deviceProfileId ";
@ -222,7 +217,7 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
} else if (device.getId() != null) {
oldDevice = findDeviceById(device.getTenantId(), device.getId());
}
DeviceEvent deviceEvent = new DeviceEvent(device.getTenantId(), device.getId(), device.getName(), oldDevice != null ? oldDevice.getName() : null);
DeviceCacheEvictEvent deviceCacheEvictEvent = new DeviceCacheEvictEvent(device.getTenantId(), device.getId(), device.getName(), oldDevice != null ? oldDevice.getName() : null);
try {
DeviceProfile deviceProfile;
if (device.getDeviceProfileId() == null) {
@ -241,13 +236,13 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
device.setType(deviceProfile.getName());
device.setDeviceData(syncDeviceData(deviceProfile, device.getDeviceData()));
Device result = deviceDao.saveAndFlush(device.getTenantId(), device);
publishEvictEvent(deviceEvent);
publishEvictEvent(deviceCacheEvictEvent);
return result;
} catch (Exception t) {
ConstraintViolationException e = extractConstraintViolationException(t).orElse(null);
if (e != null && e.getConstraintName() != null && e.getConstraintName().equalsIgnoreCase("device_name_unq_key")) {
// remove device from cache in case null value cached in the distributed redis.
handleEvictEvent(deviceEvent);
handleEvictEvent(deviceCacheEvictEvent);
throw new DataValidationException("Device with such name already exists!");
} else {
throw t;
@ -255,16 +250,16 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
}
}
@TransactionalEventListener(classes = DeviceEvent.class)
@TransactionalEventListener(classes = DeviceCacheEvictEvent.class)
@Override
public void handleEvictEvent(DeviceEvent event) {
public void handleEvictEvent(DeviceCacheEvictEvent event) {
List<DeviceCacheKey> keys = new ArrayList<>(3);
keys.add(new DeviceCacheKey(event.getTenantId(), event.getNewDeviceName()));
keys.add(new DeviceCacheKey(event.getTenantId(), event.getNewName()));
if (event.getDeviceId() != null) {
keys.add(new DeviceCacheKey(event.getTenantId(), event.getDeviceId()));
}
if (StringUtils.isNotEmpty(event.getOldDeviceName()) && !event.getOldDeviceName().equals(event.getNewDeviceName())) {
keys.add(new DeviceCacheKey(event.getTenantId(), event.getOldDeviceName()));
if (StringUtils.isNotEmpty(event.getOldName()) && !event.getOldName().equals(event.getNewName())) {
keys.add(new DeviceCacheKey(event.getTenantId(), event.getOldName()));
}
cache.evict(keys);
}
@ -323,7 +318,7 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
validateId(deviceId, INCORRECT_DEVICE_ID + deviceId);
Device device = deviceDao.findById(tenantId, deviceId.getId());
DeviceEvent deviceEvent = new DeviceEvent(device.getTenantId(), device.getId(), device.getName(), null);
DeviceCacheEvictEvent deviceCacheEvictEvent = new DeviceCacheEvictEvent(device.getTenantId(), device.getId(), device.getName(), null);
try {
List<EntityView> entityViews = entityViewService.findEntityViewsByTenantIdAndEntityIdAsync(device.getTenantId(), deviceId).get();
if (entityViews != null && !entityViews.isEmpty()) {
@ -342,7 +337,7 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
deviceDao.removeById(tenantId, deviceId.getId());
publishEvictEvent(deviceEvent);
publishEvictEvent(deviceCacheEvictEvent);
}
@ -571,8 +566,8 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
device.setCustomerId(null);
Device savedDevice = doSaveDevice(device, null, true);
DeviceEvent oldTenantEvent = new DeviceEvent(oldTenantId, device.getId(), device.getName(), null);
DeviceEvent newTenantEvent = new DeviceEvent(savedDevice.getTenantId(), device.getId(), device.getName(), null);
DeviceCacheEvictEvent oldTenantEvent = new DeviceCacheEvictEvent(oldTenantId, device.getId(), device.getName(), null);
DeviceCacheEvictEvent newTenantEvent = new DeviceCacheEvictEvent(savedDevice.getTenantId(), device.getId(), device.getName(), null);
// explicitly remove device with previous tenant id from cache
// result device object will have different tenant id and will not remove entity from cache
@ -583,7 +578,6 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
}
@Override
@CacheEvict(cacheNames = DEVICE_CACHE, key = "{#profile.tenantId, #provisionRequest.deviceName}")
@Transactional
public Device saveDevice(ProvisionRequest provisionRequest, DeviceProfile profile) {
Device device = new Device();
@ -626,7 +620,7 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
}
}
publishEvictEvent(new DeviceEvent(savedDevice.getTenantId(), savedDevice.getId(), provisionRequest.getDeviceName(), null));
publishEvictEvent(new DeviceCacheEvictEvent(savedDevice.getTenantId(), savedDevice.getId(), provisionRequest.getDeviceName(), null));
return savedDevice;
}