diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java index c6cfc4d883..2c48b1134e 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,7 +16,10 @@ package org.thingsboard.server.cache; import java.io.Serializable; +import java.util.Collection; import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; public interface TbTransactionalCache { @@ -28,10 +31,35 @@ public interface TbTransactionalCache keys); + void evictOrPut(K key, V value); TbCacheTransaction newTransactionForKey(K key); TbCacheTransaction newTransactionForKeys(List keys); + default R getAndPutInTransaction(K key, Supplier dbCall, Function cacheValueToResult, Function dbValueToCacheValue, boolean cacheNullValue) { + TbCacheValueWrapper cacheValueWrapper = get(key); + if (cacheValueWrapper != null) { + var cacheValue = cacheValueWrapper.get(); + return cacheValue == null ? null : cacheValueToResult.apply(cacheValue); + } + var cacheTransaction = newTransactionForKey(key); + try { + R dbValue = dbCall.get(); + if (dbValue != null || cacheNullValue) { + cacheTransaction.putIfAbsent(key, dbValueToCacheValue.apply(dbValue)); + cacheTransaction.commit(); + return dbValue; + } else { + cacheTransaction.rollback(); + return null; + } + } catch (Throwable e) { + cacheTransaction.rollback(); + throw e; + } + } + } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/relation/RelationService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/relation/RelationService.java index 023e757689..dbd641c1c8 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/relation/RelationService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/relation/RelationService.java @@ -38,8 +38,6 @@ public interface RelationService { EntityRelation getRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup); - ListenableFuture getRelationAsync(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup); - boolean saveRelation(TenantId tenantId, EntityRelation relation); ListenableFuture saveRelationAsync(TenantId tenantId, EntityRelation relation); diff --git a/dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbTransactionalCache.java b/dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbTransactionalCache.java index ccca1d483d..19859ddea7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbTransactionalCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbTransactionalCache.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -23,6 +23,7 @@ import org.thingsboard.server.cache.TbCacheValueWrapper; import org.thingsboard.server.cache.TbTransactionalCache; import java.io.Serializable; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -71,6 +72,19 @@ public abstract class CaffeineTbTransactionalCache keys) { + lock.lock(); + try { + keys.forEach(key -> { + failAllTransactionsByKey(key); + doEvict(key); + }); + } finally { + lock.unlock(); + } + } + @Override public void evictOrPut(K key, V value) { //No need to put the value in case of Caffeine, because evict will cancel concurrent transaction used to "get" the missing value from cache. diff --git a/dao/src/main/java/org/thingsboard/server/dao/cache/RedisTbTransactionalCache.java b/dao/src/main/java/org/thingsboard/server/dao/cache/RedisTbTransactionalCache.java index 7b619032b9..8202a8dbbe 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cache/RedisTbTransactionalCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cache/RedisTbTransactionalCache.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -34,6 +34,7 @@ import org.thingsboard.server.cache.TbTransactionalCache; import java.io.Serializable; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; @@ -96,6 +97,13 @@ public abstract class RedisTbTransactionalCache keys) { + try (var connection = connectionFactory.getConnection()) { + connection.del(keys.stream().map(this::getRawKey).toArray(byte[][]::new)); + } + } + @Override public void evictOrPut(K key, V value) { try (var connection = connectionFactory.getConnection()) { @@ -117,8 +125,7 @@ public abstract class RedisTbTransactionalCache newTransactionForKeys(List keys) { - byte[][] rawKeysList = keys.stream().map(this::getRawKey).toArray(byte[][]::new); - RedisConnection connection = watch(rawKeysList); + RedisConnection connection = watch(keys.stream().map(this::getRawKey).toArray(byte[][]::new)); return new RedisTbCacheTransaction<>(this, connection); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/AttributeRedisCache.java b/dao/src/main/java/org/thingsboard/server/dao/relation/AttributeRedisCache.java new file mode 100644 index 0000000000..383d900d34 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/AttributeRedisCache.java @@ -0,0 +1,50 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.relation; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.data.redis.serializer.SerializationException; +import org.springframework.stereotype.Service; +import org.thingsboard.server.cache.CacheSpecsMap; +import org.thingsboard.server.cache.TBRedisCacheConfiguration; +import org.thingsboard.server.common.data.CacheConstants; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.dao.attributes.AttributeCacheKey; +import org.thingsboard.server.dao.cache.RedisTbTransactionalCache; + +@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") +@Service("AttributeCache") +public class AttributeRedisCache extends RedisTbTransactionalCache { + + public AttributeRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) { + super(CacheConstants.ATTRIBUTES_CACHE, cacheSpecsMap, connectionFactory, configuration, new RedisSerializer<>() { + + private final RedisSerializer java = RedisSerializer.java(); + + @Override + public byte[] serialize(AttributeKvEntry attributeKvEntry) throws SerializationException { + return java.serialize(attributeKvEntry); + } + + @Override + public AttributeKvEntry deserialize(byte[] bytes) throws SerializationException { + return (AttributeKvEntry) java.deserialize(bytes); + } + }); + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java index 36ab0d9f85..3394d4ee8e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,23 +20,20 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.Cache; -import org.springframework.cache.CacheManager; -import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.Cacheable; -import org.springframework.cache.annotation.Caching; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.dao.ConcurrencyFailureException; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.event.TransactionalEventListener; import org.springframework.util.StringUtils; -import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.cache.TbTransactionalCache; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntityRelationInfo; import org.thingsboard.server.common.data.relation.EntityRelationsQuery; @@ -50,8 +47,6 @@ import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.service.ConstraintValidator; import javax.annotation.Nullable; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -68,17 +63,25 @@ import static org.thingsboard.server.dao.service.Validator.validateId; * Created by ashvayka on 28.04.17. */ @Service +@RequiredArgsConstructor @Slf4j public class BaseRelationService implements RelationService { - @Autowired - private RelationDao relationDao; + private final RelationDao relationDao; + private final EntityService entityService; + private final TbTransactionalCache cache; + private final ApplicationEventPublisher publisher; - @Autowired - private EntityService entityService; - - @Autowired - private CacheManager cacheManager; + @TransactionalEventListener(classes = EntityRelationEvent.class) + public void handleRelationEvictEvent(EntityRelationEvent event) { + List keys = new ArrayList<>(5); + keys.add(new RelationCacheKey(event.getFrom(), event.getTo(), event.getType(), event.getTypeGroup())); + keys.add(new RelationCacheKey(event.getFrom(), null, event.getType(), event.getTypeGroup(), EntitySearchDirection.FROM)); + keys.add(new RelationCacheKey(event.getFrom(), null, null, event.getTypeGroup(), EntitySearchDirection.FROM)); + keys.add(new RelationCacheKey(null, event.getTo(), event.getType(), event.getTypeGroup(), EntitySearchDirection.TO)); + keys.add(new RelationCacheKey(null, event.getTo(), null, event.getTypeGroup(), EntitySearchDirection.TO)); + cache.evict(keys); + } @Override public ListenableFuture checkRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { @@ -87,106 +90,76 @@ public class BaseRelationService implements RelationService { return relationDao.checkRelation(tenantId, from, to, relationType, typeGroup); } - @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}") @Transactional(propagation = Propagation.SUPPORTS) @Override public EntityRelation getRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { - return getRelation(tenantId, from, to, relationType, typeGroup); - } - - @Override - public ListenableFuture getRelationAsync(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { log.trace("Executing EntityRelation [{}][{}][{}][{}]", from, to, relationType, typeGroup); validate(from, to, relationType, typeGroup); - return relationDao.getRelation(tenantId, from, to, relationType, typeGroup); + RelationCacheKey cacheKey = new RelationCacheKey(from, to, relationType, typeGroup); + return cache.getAndPutInTransaction(cacheKey, + () -> relationDao.getRelation(tenantId, from, to, relationType, typeGroup), + RelationCacheValue::getRelation, + relations -> RelationCacheValue.builder().relation(relations).build(), false); } - @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup, 'FROM'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup, 'FROM'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup, 'TO'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup, 'TO'}") - }) @Override @Transactional(propagation = Propagation.SUPPORTS) public boolean saveRelation(TenantId tenantId, EntityRelation relation) { log.trace("Executing saveRelation [{}]", relation); validate(relation); - return relationDao.saveRelation(tenantId, relation); + var result = relationDao.saveRelation(tenantId, relation); + publisher.publishEvent(EntityRelationEvent.from(relation)); + return result; } - @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup, 'FROM'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup, 'FROM'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup, 'TO'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup, 'TO'}") - }) @Override public ListenableFuture saveRelationAsync(TenantId tenantId, EntityRelation relation) { log.trace("Executing saveRelationAsync [{}]", relation); validate(relation); - return relationDao.saveRelationAsync(tenantId, relation); + var future = relationDao.saveRelationAsync(tenantId, relation); + future.addListener(() -> handleRelationEvictEvent(EntityRelationEvent.from(relation)), MoreExecutors.directExecutor()); + return future; } - @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup, 'FROM'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup, 'FROM'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup, 'TO'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup, 'TO'}") - }) @Transactional(propagation = Propagation.SUPPORTS) @Override public boolean deleteRelation(TenantId tenantId, EntityRelation relation) { log.trace("Executing deleteRelation [{}]", relation); validate(relation); - return relationDao.deleteRelation(tenantId, relation); + var result = relationDao.deleteRelation(tenantId, relation); + //TODO: evict cache only if the relation was deleted. Note: relationDao.deleteRelation requires improvement. + publisher.publishEvent(EntityRelationEvent.from(relation)); + return result; } - @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup, 'FROM'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup, 'FROM'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup, 'TO'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup, 'TO'}") - }) @Override public ListenableFuture deleteRelationAsync(TenantId tenantId, EntityRelation relation) { log.trace("Executing deleteRelationAsync [{}]", relation); validate(relation); - return relationDao.deleteRelationAsync(tenantId, relation); + var future = relationDao.deleteRelationAsync(tenantId, relation); + future.addListener(() -> handleRelationEvictEvent(EntityRelationEvent.from(relation)), MoreExecutors.directExecutor()); + return future; } - @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup, 'FROM'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup, 'FROM'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup, 'TO'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup, 'TO'}") - }) @Transactional(propagation = Propagation.SUPPORTS) @Override public boolean deleteRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { log.trace("Executing deleteRelation [{}][{}][{}][{}]", from, to, relationType, typeGroup); validate(from, to, relationType, typeGroup); - return relationDao.deleteRelation(tenantId, from, to, relationType, typeGroup); + var result = relationDao.deleteRelation(tenantId, from, to, relationType, typeGroup); + //TODO: evict cache only if the relation was deleted. Note: relationDao.deleteRelation requires improvement. + publisher.publishEvent(new EntityRelationEvent(from, to, relationType, typeGroup)); + return result; } - @Caching(evict = { - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup, 'FROM'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup, 'FROM'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup, 'TO'}"), - @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup, 'TO'}") - }) @Override public ListenableFuture deleteRelationAsync(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { log.trace("Executing deleteRelationAsync [{}][{}][{}][{}]", from, to, relationType, typeGroup); validate(from, to, relationType, typeGroup); - //TODO: clear cache using transform - return relationDao.deleteRelationAsync(tenantId, from, to, relationType, typeGroup); + var future = relationDao.deleteRelationAsync(tenantId, from, to, relationType, typeGroup); + EntityRelationEvent event = new EntityRelationEvent(from, to, relationType, typeGroup); + future.addListener(() -> handleRelationEvictEvent(event), MoreExecutors.directExecutor()); + return future; } @Transactional(propagation = Propagation.SUPPORTS) @@ -194,7 +167,6 @@ public class BaseRelationService implements RelationService { public void deleteEntityRelations(TenantId tenantId, EntityId entityId) { log.trace("Executing deleteEntityRelations [{}]", entityId); validate(entityId); - final Cache cache = cacheManager.getCache(RELATIONS_CACHE); List inboundRelations = new ArrayList<>(); for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) { inboundRelations.addAll(relationDao.findAllByTo(tenantId, entityId, typeGroup)); @@ -206,11 +178,11 @@ public class BaseRelationService implements RelationService { } for (EntityRelation relation : inboundRelations) { - delete(tenantId, cache, relation, true); + delete(tenantId, relation, true); } for (EntityRelation relation : outboundRelations) { - delete(tenantId, cache, relation, false); + delete(tenantId, relation, false); } relationDao.deleteOutboundRelations(tenantId, entityId); @@ -219,7 +191,6 @@ public class BaseRelationService implements RelationService { @Override public ListenableFuture deleteEntityRelationsAsync(TenantId tenantId, EntityId entityId) { - Cache cache = cacheManager.getCache(RELATIONS_CACHE); log.trace("Executing deleteEntityRelationsAsync [{}]", entityId); validate(entityId); List>> inboundRelationsList = new ArrayList<>(); @@ -238,13 +209,13 @@ public class BaseRelationService implements RelationService { ListenableFuture> inboundDeletions = Futures.transformAsync(inboundRelations, relations -> { - List> results = deleteRelationGroupsAsync(tenantId, relations, cache, true); + List> results = deleteRelationGroupsAsync(tenantId, relations, true); return Futures.allAsList(results); }, MoreExecutors.directExecutor()); ListenableFuture> outboundDeletions = Futures.transformAsync(outboundRelations, relations -> { - List> results = deleteRelationGroupsAsync(tenantId, relations, cache, false); + List> results = deleteRelationGroupsAsync(tenantId, relations, false); return Futures.allAsList(results); }, MoreExecutors.directExecutor()); @@ -256,25 +227,29 @@ public class BaseRelationService implements RelationService { result -> null, MoreExecutors.directExecutor()); } - private List> deleteRelationGroupsAsync(TenantId tenantId, List> relations, Cache cache, boolean deleteFromDb) { + private List> deleteRelationGroupsAsync(TenantId tenantId, List> relations, boolean deleteFromDb) { List> results = new ArrayList<>(); for (List relationList : relations) { - relationList.forEach(relation -> results.add(deleteAsync(tenantId, cache, relation, deleteFromDb))); + relationList.forEach(relation -> results.add(deleteAsync(tenantId, relation, deleteFromDb))); } return results; } - private ListenableFuture deleteAsync(TenantId tenantId, Cache cache, EntityRelation relation, boolean deleteFromDb) { - cacheEviction(relation, cache); + private ListenableFuture deleteAsync(TenantId tenantId, EntityRelation relation, boolean deleteFromDb) { if (deleteFromDb) { - return relationDao.deleteRelationAsync(tenantId, relation); + return Futures.transform(relationDao.deleteRelationAsync(tenantId, relation), + bool -> { + handleRelationEvictEvent(EntityRelationEvent.from(relation)); + return bool; + }, MoreExecutors.directExecutor()); } else { + handleRelationEvictEvent(EntityRelationEvent.from(relation)); return Futures.immediateFuture(false); } } - boolean delete(TenantId tenantId, Cache cache, EntityRelation relation, boolean deleteFromDb) { - cacheEviction(relation, cache); + boolean delete(TenantId tenantId, EntityRelation relation, boolean deleteFromDb) { + publisher.publishEvent(EntityRelationEvent.from(relation)); if (deleteFromDb) { try { return relationDao.deleteRelation(tenantId, relation); @@ -285,49 +260,17 @@ public class BaseRelationService implements RelationService { return false; } - private void cacheEviction(EntityRelation relation, Cache cache) { - List fromToTypeAndTypeGroup = new ArrayList<>(); - fromToTypeAndTypeGroup.add(relation.getFrom()); - fromToTypeAndTypeGroup.add(relation.getTo()); - fromToTypeAndTypeGroup.add(relation.getType()); - fromToTypeAndTypeGroup.add(relation.getTypeGroup()); - cache.evict(fromToTypeAndTypeGroup); - - List fromTypeAndTypeGroup = new ArrayList<>(); - fromTypeAndTypeGroup.add(relation.getFrom()); - fromTypeAndTypeGroup.add(relation.getType()); - fromTypeAndTypeGroup.add(relation.getTypeGroup()); - fromTypeAndTypeGroup.add(EntitySearchDirection.FROM.name()); - cache.evict(fromTypeAndTypeGroup); - - List fromAndTypeGroup = new ArrayList<>(); - fromAndTypeGroup.add(relation.getFrom()); - fromAndTypeGroup.add(relation.getTypeGroup()); - fromAndTypeGroup.add(EntitySearchDirection.FROM.name()); - cache.evict(fromAndTypeGroup); - - List toAndTypeGroup = new ArrayList<>(); - toAndTypeGroup.add(relation.getTo()); - toAndTypeGroup.add(relation.getTypeGroup()); - toAndTypeGroup.add(EntitySearchDirection.TO.name()); - cache.evict(toAndTypeGroup); - - List toTypeAndTypeGroup = new ArrayList<>(); - toTypeAndTypeGroup.add(relation.getTo()); - toTypeAndTypeGroup.add(relation.getType()); - toTypeAndTypeGroup.add(relation.getTypeGroup()); - toTypeAndTypeGroup.add(EntitySearchDirection.TO.name()); - cache.evict(toTypeAndTypeGroup); - } - -// @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup, 'FROM'}") @Transactional(propagation = Propagation.SUPPORTS) @Override public List findByFrom(TenantId tenantId, EntityId from, RelationTypeGroup typeGroup) { validate(from); validateTypeGroup(typeGroup); log.trace("[{}] Find by from: [{}][{}]: ", tenantId, from, typeGroup, new RuntimeException()); - return relationDao.findAllByFrom(tenantId, from, typeGroup); + RelationCacheKey cacheKey = RelationCacheKey.builder().from(from).typeGroup(typeGroup).direction(EntitySearchDirection.FROM).build(); + return cache.getAndPutInTransaction(cacheKey, + () -> relationDao.findAllByFrom(tenantId, from, typeGroup), + RelationCacheValue::getRelations, + relations -> RelationCacheValue.builder().relations(relations).build(), false); } @Override @@ -381,7 +324,7 @@ public class BaseRelationService implements RelationService { }, MoreExecutors.directExecutor()); } -// @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup, 'FROM'}") + // @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup, 'FROM'}") @Override public List findByFromAndType(TenantId tenantId, EntityId from, String relationType, RelationTypeGroup typeGroup) { try { diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/EntityRelationEvent.java b/dao/src/main/java/org/thingsboard/server/dao/relation/EntityRelationEvent.java new file mode 100644 index 0000000000..e03f61b074 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/EntityRelationEvent.java @@ -0,0 +1,23 @@ +package org.thingsboard.server.dao.relation; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.data.relation.RelationTypeGroup; + +@RequiredArgsConstructor +public class EntityRelationEvent { + @Getter + private final EntityId from; + @Getter + private final EntityId to; + @Getter + private final String type; + @Getter + private final RelationTypeGroup typeGroup; + + public static EntityRelationEvent from(EntityRelation relation) { + return new EntityRelationEvent(relation.getFrom(), relation.getTo(), relation.getType(), relation.getTypeGroup()); + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/RelationCacheKey.java b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationCacheKey.java new file mode 100644 index 0000000000..e4c597808e --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationCacheKey.java @@ -0,0 +1,70 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.relation; + +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.relation.EntitySearchDirection; +import org.thingsboard.server.common.data.relation.RelationTypeGroup; + +import java.io.Serializable; + +@EqualsAndHashCode +@Getter +@RequiredArgsConstructor +@Builder +public class RelationCacheKey implements Serializable { + + private static final long serialVersionUID = 3911151843961657570L; + + private final EntityId from; + private final EntityId to; + private final String type; + private final RelationTypeGroup typeGroup; + private final EntitySearchDirection direction; + + public RelationCacheKey(EntityId from, EntityId to, String type, RelationTypeGroup typeGroup) { + this(from, to, type, typeGroup, null); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + boolean first = true; + first = addElement(sb, first, from); + first = addElement(sb, first, to); + first = addElement(sb, first, typeGroup); + first = addElement(sb, first, type); + first = addElement(sb, first, direction); + return sb.toString(); + } + + private boolean addElement(StringBuilder sb, boolean first, Object element) { + if (element != null) { + if (!first) { + sb.append("_"); + } + sb.append(element); + return false; + } else { + return first; + } + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/RelationCacheValue.java b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationCacheValue.java new file mode 100644 index 0000000000..348a750b08 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationCacheValue.java @@ -0,0 +1,40 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.relation; + +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.data.relation.RelationTypeGroup; + +import java.io.Serializable; +import java.util.List; + +@EqualsAndHashCode +@Getter +@RequiredArgsConstructor +@Builder +public class RelationCacheValue implements Serializable { + + private static final long serialVersionUID = 3911151843961657570L; + + private final EntityRelation relation; + private final List relations; + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/RelationCaffeineCache.java b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationCaffeineCache.java new file mode 100644 index 0000000000..b6d20331fc --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationCaffeineCache.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.relation; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cache.CacheManager; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.CacheConstants; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.dao.attributes.AttributeCacheKey; +import org.thingsboard.server.dao.cache.CaffeineTbTransactionalCache; + +@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true) +@Service("RelationCache") +public class RelationCaffeineCache extends CaffeineTbTransactionalCache { + + public RelationCaffeineCache(CacheManager cacheManager) { + super(cacheManager, CacheConstants.RELATIONS_CACHE); + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java index df285602ba..380923acf4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java @@ -43,7 +43,7 @@ public interface RelationDao { ListenableFuture checkRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup); - ListenableFuture getRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup); + EntityRelation getRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup); boolean saveRelation(TenantId tenantId, EntityRelation relation); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationDao.java index 78a94bfaa3..fd59ada95f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationDao.java @@ -107,9 +107,9 @@ public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService imple } @Override - public ListenableFuture getRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { + public EntityRelation getRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { RelationCompositeKey key = getRelationCompositeKey(from, to, relationType, typeGroup); - return service.submit(() -> DaoUtil.getData(relationRepository.findById(key))); + return DaoUtil.getData(relationRepository.findById(key)); } private RelationCompositeKey getRelationCompositeKey(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {