Start of the RelationService refactoring
This commit is contained in:
parent
9f3f2985ab
commit
a6f38902b7
@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@ -16,7 +16,10 @@
|
||||
package org.thingsboard.server.cache;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public interface TbTransactionalCache<K extends Serializable, V extends Serializable> {
|
||||
|
||||
@ -28,10 +31,35 @@ public interface TbTransactionalCache<K extends Serializable, V extends Serializ
|
||||
|
||||
void evict(K key);
|
||||
|
||||
void evict(Collection<K> keys);
|
||||
|
||||
void evictOrPut(K key, V value);
|
||||
|
||||
TbCacheTransaction<K, V> newTransactionForKey(K key);
|
||||
|
||||
TbCacheTransaction<K, V> newTransactionForKeys(List<K> keys);
|
||||
|
||||
default <R> R getAndPutInTransaction(K key, Supplier<R> dbCall, Function<V, R> cacheValueToResult, Function<R, V> dbValueToCacheValue, boolean cacheNullValue) {
|
||||
TbCacheValueWrapper<V> cacheValueWrapper = get(key);
|
||||
if (cacheValueWrapper != null) {
|
||||
var cacheValue = cacheValueWrapper.get();
|
||||
return cacheValue == null ? null : cacheValueToResult.apply(cacheValue);
|
||||
}
|
||||
var cacheTransaction = newTransactionForKey(key);
|
||||
try {
|
||||
R dbValue = dbCall.get();
|
||||
if (dbValue != null || cacheNullValue) {
|
||||
cacheTransaction.putIfAbsent(key, dbValueToCacheValue.apply(dbValue));
|
||||
cacheTransaction.commit();
|
||||
return dbValue;
|
||||
} else {
|
||||
cacheTransaction.rollback();
|
||||
return null;
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
cacheTransaction.rollback();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -38,8 +38,6 @@ public interface RelationService {
|
||||
|
||||
EntityRelation getRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup);
|
||||
|
||||
ListenableFuture<EntityRelation> getRelationAsync(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup);
|
||||
|
||||
boolean saveRelation(TenantId tenantId, EntityRelation relation);
|
||||
|
||||
ListenableFuture<Boolean> saveRelationAsync(TenantId tenantId, EntityRelation relation);
|
||||
|
||||
@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@ -23,6 +23,7 @@ import org.thingsboard.server.cache.TbCacheValueWrapper;
|
||||
import org.thingsboard.server.cache.TbTransactionalCache;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
@ -71,6 +72,19 @@ public abstract class CaffeineTbTransactionalCache<K extends Serializable, V ext
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evict(Collection<K> keys) {
|
||||
lock.lock();
|
||||
try {
|
||||
keys.forEach(key -> {
|
||||
failAllTransactionsByKey(key);
|
||||
doEvict(key);
|
||||
});
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evictOrPut(K key, V value) {
|
||||
//No need to put the value in case of Caffeine, because evict will cancel concurrent transaction used to "get" the missing value from cache.
|
||||
|
||||
@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@ -34,6 +34,7 @@ import org.thingsboard.server.cache.TbTransactionalCache;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ -96,6 +97,13 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evict(Collection<K> keys) {
|
||||
try (var connection = connectionFactory.getConnection()) {
|
||||
connection.del(keys.stream().map(this::getRawKey).toArray(byte[][]::new));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evictOrPut(K key, V value) {
|
||||
try (var connection = connectionFactory.getConnection()) {
|
||||
@ -117,8 +125,7 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
|
||||
|
||||
@Override
|
||||
public TbCacheTransaction<K, V> newTransactionForKeys(List<K> keys) {
|
||||
byte[][] rawKeysList = keys.stream().map(this::getRawKey).toArray(byte[][]::new);
|
||||
RedisConnection connection = watch(rawKeysList);
|
||||
RedisConnection connection = watch(keys.stream().map(this::getRawKey).toArray(byte[][]::new));
|
||||
return new RedisTbCacheTransaction<>(this, connection);
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,50 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.relation;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.serializer.RedisSerializer;
|
||||
import org.springframework.data.redis.serializer.SerializationException;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.cache.CacheSpecsMap;
|
||||
import org.thingsboard.server.cache.TBRedisCacheConfiguration;
|
||||
import org.thingsboard.server.common.data.CacheConstants;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.dao.attributes.AttributeCacheKey;
|
||||
import org.thingsboard.server.dao.cache.RedisTbTransactionalCache;
|
||||
|
||||
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
|
||||
@Service("AttributeCache")
|
||||
public class AttributeRedisCache extends RedisTbTransactionalCache<AttributeCacheKey, AttributeKvEntry> {
|
||||
|
||||
public AttributeRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) {
|
||||
super(CacheConstants.ATTRIBUTES_CACHE, cacheSpecsMap, connectionFactory, configuration, new RedisSerializer<>() {
|
||||
|
||||
private final RedisSerializer<Object> java = RedisSerializer.java();
|
||||
|
||||
@Override
|
||||
public byte[] serialize(AttributeKvEntry attributeKvEntry) throws SerializationException {
|
||||
return java.serialize(attributeKvEntry);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AttributeKvEntry deserialize(byte[] bytes) throws SerializationException {
|
||||
return (AttributeKvEntry) java.deserialize(bytes);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@ -20,23 +20,20 @@ import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cache.Cache;
|
||||
import org.springframework.cache.CacheManager;
|
||||
import org.springframework.cache.annotation.CacheEvict;
|
||||
import org.springframework.cache.annotation.Cacheable;
|
||||
import org.springframework.cache.annotation.Caching;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.dao.ConcurrencyFailureException;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Propagation;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.transaction.event.TransactionalEventListener;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.cache.TbTransactionalCache;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelationInfo;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
|
||||
@ -50,8 +47,6 @@ import org.thingsboard.server.dao.exception.DataValidationException;
|
||||
import org.thingsboard.server.dao.service.ConstraintValidator;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
@ -68,17 +63,25 @@ import static org.thingsboard.server.dao.service.Validator.validateId;
|
||||
* Created by ashvayka on 28.04.17.
|
||||
*/
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class BaseRelationService implements RelationService {
|
||||
|
||||
@Autowired
|
||||
private RelationDao relationDao;
|
||||
private final RelationDao relationDao;
|
||||
private final EntityService entityService;
|
||||
private final TbTransactionalCache<RelationCacheKey, RelationCacheValue> cache;
|
||||
private final ApplicationEventPublisher publisher;
|
||||
|
||||
@Autowired
|
||||
private EntityService entityService;
|
||||
|
||||
@Autowired
|
||||
private CacheManager cacheManager;
|
||||
@TransactionalEventListener(classes = EntityRelationEvent.class)
|
||||
public void handleRelationEvictEvent(EntityRelationEvent event) {
|
||||
List<RelationCacheKey> keys = new ArrayList<>(5);
|
||||
keys.add(new RelationCacheKey(event.getFrom(), event.getTo(), event.getType(), event.getTypeGroup()));
|
||||
keys.add(new RelationCacheKey(event.getFrom(), null, event.getType(), event.getTypeGroup(), EntitySearchDirection.FROM));
|
||||
keys.add(new RelationCacheKey(event.getFrom(), null, null, event.getTypeGroup(), EntitySearchDirection.FROM));
|
||||
keys.add(new RelationCacheKey(null, event.getTo(), event.getType(), event.getTypeGroup(), EntitySearchDirection.TO));
|
||||
keys.add(new RelationCacheKey(null, event.getTo(), null, event.getTypeGroup(), EntitySearchDirection.TO));
|
||||
cache.evict(keys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Boolean> checkRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
|
||||
@ -87,106 +90,76 @@ public class BaseRelationService implements RelationService {
|
||||
return relationDao.checkRelation(tenantId, from, to, relationType, typeGroup);
|
||||
}
|
||||
|
||||
@Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}")
|
||||
@Transactional(propagation = Propagation.SUPPORTS)
|
||||
@Override
|
||||
public EntityRelation getRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
|
||||
return getRelation(tenantId, from, to, relationType, typeGroup);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<EntityRelation> getRelationAsync(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
|
||||
log.trace("Executing EntityRelation [{}][{}][{}][{}]", from, to, relationType, typeGroup);
|
||||
validate(from, to, relationType, typeGroup);
|
||||
return relationDao.getRelation(tenantId, from, to, relationType, typeGroup);
|
||||
RelationCacheKey cacheKey = new RelationCacheKey(from, to, relationType, typeGroup);
|
||||
return cache.getAndPutInTransaction(cacheKey,
|
||||
() -> relationDao.getRelation(tenantId, from, to, relationType, typeGroup),
|
||||
RelationCacheValue::getRelation,
|
||||
relations -> RelationCacheValue.builder().relation(relations).build(), false);
|
||||
}
|
||||
|
||||
@Caching(evict = {
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup, 'FROM'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup, 'FROM'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup, 'TO'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup, 'TO'}")
|
||||
})
|
||||
@Override
|
||||
@Transactional(propagation = Propagation.SUPPORTS)
|
||||
public boolean saveRelation(TenantId tenantId, EntityRelation relation) {
|
||||
log.trace("Executing saveRelation [{}]", relation);
|
||||
validate(relation);
|
||||
return relationDao.saveRelation(tenantId, relation);
|
||||
var result = relationDao.saveRelation(tenantId, relation);
|
||||
publisher.publishEvent(EntityRelationEvent.from(relation));
|
||||
return result;
|
||||
}
|
||||
|
||||
@Caching(evict = {
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup, 'FROM'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup, 'FROM'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup, 'TO'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup, 'TO'}")
|
||||
})
|
||||
@Override
|
||||
public ListenableFuture<Boolean> saveRelationAsync(TenantId tenantId, EntityRelation relation) {
|
||||
log.trace("Executing saveRelationAsync [{}]", relation);
|
||||
validate(relation);
|
||||
return relationDao.saveRelationAsync(tenantId, relation);
|
||||
var future = relationDao.saveRelationAsync(tenantId, relation);
|
||||
future.addListener(() -> handleRelationEvictEvent(EntityRelationEvent.from(relation)), MoreExecutors.directExecutor());
|
||||
return future;
|
||||
}
|
||||
|
||||
@Caching(evict = {
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup, 'FROM'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup, 'FROM'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup, 'TO'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup, 'TO'}")
|
||||
})
|
||||
@Transactional(propagation = Propagation.SUPPORTS)
|
||||
@Override
|
||||
public boolean deleteRelation(TenantId tenantId, EntityRelation relation) {
|
||||
log.trace("Executing deleteRelation [{}]", relation);
|
||||
validate(relation);
|
||||
return relationDao.deleteRelation(tenantId, relation);
|
||||
var result = relationDao.deleteRelation(tenantId, relation);
|
||||
//TODO: evict cache only if the relation was deleted. Note: relationDao.deleteRelation requires improvement.
|
||||
publisher.publishEvent(EntityRelationEvent.from(relation));
|
||||
return result;
|
||||
}
|
||||
|
||||
@Caching(evict = {
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.to, #relation.type, #relation.typeGroup}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.type, #relation.typeGroup, 'FROM'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.from, #relation.typeGroup, 'FROM'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.typeGroup, 'TO'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup, 'TO'}")
|
||||
})
|
||||
@Override
|
||||
public ListenableFuture<Boolean> deleteRelationAsync(TenantId tenantId, EntityRelation relation) {
|
||||
log.trace("Executing deleteRelationAsync [{}]", relation);
|
||||
validate(relation);
|
||||
return relationDao.deleteRelationAsync(tenantId, relation);
|
||||
var future = relationDao.deleteRelationAsync(tenantId, relation);
|
||||
future.addListener(() -> handleRelationEvictEvent(EntityRelationEvent.from(relation)), MoreExecutors.directExecutor());
|
||||
return future;
|
||||
}
|
||||
|
||||
@Caching(evict = {
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup, 'FROM'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup, 'FROM'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup, 'TO'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup, 'TO'}")
|
||||
})
|
||||
@Transactional(propagation = Propagation.SUPPORTS)
|
||||
@Override
|
||||
public boolean deleteRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
|
||||
log.trace("Executing deleteRelation [{}][{}][{}][{}]", from, to, relationType, typeGroup);
|
||||
validate(from, to, relationType, typeGroup);
|
||||
return relationDao.deleteRelation(tenantId, from, to, relationType, typeGroup);
|
||||
var result = relationDao.deleteRelation(tenantId, from, to, relationType, typeGroup);
|
||||
//TODO: evict cache only if the relation was deleted. Note: relationDao.deleteRelation requires improvement.
|
||||
publisher.publishEvent(new EntityRelationEvent(from, to, relationType, typeGroup));
|
||||
return result;
|
||||
}
|
||||
|
||||
@Caching(evict = {
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup, 'FROM'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup, 'FROM'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #typeGroup, 'TO'}"),
|
||||
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup, 'TO'}")
|
||||
})
|
||||
@Override
|
||||
public ListenableFuture<Boolean> deleteRelationAsync(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
|
||||
log.trace("Executing deleteRelationAsync [{}][{}][{}][{}]", from, to, relationType, typeGroup);
|
||||
validate(from, to, relationType, typeGroup);
|
||||
//TODO: clear cache using transform
|
||||
return relationDao.deleteRelationAsync(tenantId, from, to, relationType, typeGroup);
|
||||
var future = relationDao.deleteRelationAsync(tenantId, from, to, relationType, typeGroup);
|
||||
EntityRelationEvent event = new EntityRelationEvent(from, to, relationType, typeGroup);
|
||||
future.addListener(() -> handleRelationEvictEvent(event), MoreExecutors.directExecutor());
|
||||
return future;
|
||||
}
|
||||
|
||||
@Transactional(propagation = Propagation.SUPPORTS)
|
||||
@ -194,7 +167,6 @@ public class BaseRelationService implements RelationService {
|
||||
public void deleteEntityRelations(TenantId tenantId, EntityId entityId) {
|
||||
log.trace("Executing deleteEntityRelations [{}]", entityId);
|
||||
validate(entityId);
|
||||
final Cache cache = cacheManager.getCache(RELATIONS_CACHE);
|
||||
List<EntityRelation> inboundRelations = new ArrayList<>();
|
||||
for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {
|
||||
inboundRelations.addAll(relationDao.findAllByTo(tenantId, entityId, typeGroup));
|
||||
@ -206,11 +178,11 @@ public class BaseRelationService implements RelationService {
|
||||
}
|
||||
|
||||
for (EntityRelation relation : inboundRelations) {
|
||||
delete(tenantId, cache, relation, true);
|
||||
delete(tenantId, relation, true);
|
||||
}
|
||||
|
||||
for (EntityRelation relation : outboundRelations) {
|
||||
delete(tenantId, cache, relation, false);
|
||||
delete(tenantId, relation, false);
|
||||
}
|
||||
|
||||
relationDao.deleteOutboundRelations(tenantId, entityId);
|
||||
@ -219,7 +191,6 @@ public class BaseRelationService implements RelationService {
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> deleteEntityRelationsAsync(TenantId tenantId, EntityId entityId) {
|
||||
Cache cache = cacheManager.getCache(RELATIONS_CACHE);
|
||||
log.trace("Executing deleteEntityRelationsAsync [{}]", entityId);
|
||||
validate(entityId);
|
||||
List<ListenableFuture<List<EntityRelation>>> inboundRelationsList = new ArrayList<>();
|
||||
@ -238,13 +209,13 @@ public class BaseRelationService implements RelationService {
|
||||
|
||||
ListenableFuture<List<Boolean>> inboundDeletions = Futures.transformAsync(inboundRelations,
|
||||
relations -> {
|
||||
List<ListenableFuture<Boolean>> results = deleteRelationGroupsAsync(tenantId, relations, cache, true);
|
||||
List<ListenableFuture<Boolean>> results = deleteRelationGroupsAsync(tenantId, relations, true);
|
||||
return Futures.allAsList(results);
|
||||
}, MoreExecutors.directExecutor());
|
||||
|
||||
ListenableFuture<List<Boolean>> outboundDeletions = Futures.transformAsync(outboundRelations,
|
||||
relations -> {
|
||||
List<ListenableFuture<Boolean>> results = deleteRelationGroupsAsync(tenantId, relations, cache, false);
|
||||
List<ListenableFuture<Boolean>> results = deleteRelationGroupsAsync(tenantId, relations, false);
|
||||
return Futures.allAsList(results);
|
||||
}, MoreExecutors.directExecutor());
|
||||
|
||||
@ -256,25 +227,29 @@ public class BaseRelationService implements RelationService {
|
||||
result -> null, MoreExecutors.directExecutor());
|
||||
}
|
||||
|
||||
private List<ListenableFuture<Boolean>> deleteRelationGroupsAsync(TenantId tenantId, List<List<EntityRelation>> relations, Cache cache, boolean deleteFromDb) {
|
||||
private List<ListenableFuture<Boolean>> deleteRelationGroupsAsync(TenantId tenantId, List<List<EntityRelation>> relations, boolean deleteFromDb) {
|
||||
List<ListenableFuture<Boolean>> results = new ArrayList<>();
|
||||
for (List<EntityRelation> relationList : relations) {
|
||||
relationList.forEach(relation -> results.add(deleteAsync(tenantId, cache, relation, deleteFromDb)));
|
||||
relationList.forEach(relation -> results.add(deleteAsync(tenantId, relation, deleteFromDb)));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
private ListenableFuture<Boolean> deleteAsync(TenantId tenantId, Cache cache, EntityRelation relation, boolean deleteFromDb) {
|
||||
cacheEviction(relation, cache);
|
||||
private ListenableFuture<Boolean> deleteAsync(TenantId tenantId, EntityRelation relation, boolean deleteFromDb) {
|
||||
if (deleteFromDb) {
|
||||
return relationDao.deleteRelationAsync(tenantId, relation);
|
||||
return Futures.transform(relationDao.deleteRelationAsync(tenantId, relation),
|
||||
bool -> {
|
||||
handleRelationEvictEvent(EntityRelationEvent.from(relation));
|
||||
return bool;
|
||||
}, MoreExecutors.directExecutor());
|
||||
} else {
|
||||
handleRelationEvictEvent(EntityRelationEvent.from(relation));
|
||||
return Futures.immediateFuture(false);
|
||||
}
|
||||
}
|
||||
|
||||
boolean delete(TenantId tenantId, Cache cache, EntityRelation relation, boolean deleteFromDb) {
|
||||
cacheEviction(relation, cache);
|
||||
boolean delete(TenantId tenantId, EntityRelation relation, boolean deleteFromDb) {
|
||||
publisher.publishEvent(EntityRelationEvent.from(relation));
|
||||
if (deleteFromDb) {
|
||||
try {
|
||||
return relationDao.deleteRelation(tenantId, relation);
|
||||
@ -285,49 +260,17 @@ public class BaseRelationService implements RelationService {
|
||||
return false;
|
||||
}
|
||||
|
||||
private void cacheEviction(EntityRelation relation, Cache cache) {
|
||||
List<Object> fromToTypeAndTypeGroup = new ArrayList<>();
|
||||
fromToTypeAndTypeGroup.add(relation.getFrom());
|
||||
fromToTypeAndTypeGroup.add(relation.getTo());
|
||||
fromToTypeAndTypeGroup.add(relation.getType());
|
||||
fromToTypeAndTypeGroup.add(relation.getTypeGroup());
|
||||
cache.evict(fromToTypeAndTypeGroup);
|
||||
|
||||
List<Object> fromTypeAndTypeGroup = new ArrayList<>();
|
||||
fromTypeAndTypeGroup.add(relation.getFrom());
|
||||
fromTypeAndTypeGroup.add(relation.getType());
|
||||
fromTypeAndTypeGroup.add(relation.getTypeGroup());
|
||||
fromTypeAndTypeGroup.add(EntitySearchDirection.FROM.name());
|
||||
cache.evict(fromTypeAndTypeGroup);
|
||||
|
||||
List<Object> fromAndTypeGroup = new ArrayList<>();
|
||||
fromAndTypeGroup.add(relation.getFrom());
|
||||
fromAndTypeGroup.add(relation.getTypeGroup());
|
||||
fromAndTypeGroup.add(EntitySearchDirection.FROM.name());
|
||||
cache.evict(fromAndTypeGroup);
|
||||
|
||||
List<Object> toAndTypeGroup = new ArrayList<>();
|
||||
toAndTypeGroup.add(relation.getTo());
|
||||
toAndTypeGroup.add(relation.getTypeGroup());
|
||||
toAndTypeGroup.add(EntitySearchDirection.TO.name());
|
||||
cache.evict(toAndTypeGroup);
|
||||
|
||||
List<Object> toTypeAndTypeGroup = new ArrayList<>();
|
||||
toTypeAndTypeGroup.add(relation.getTo());
|
||||
toTypeAndTypeGroup.add(relation.getType());
|
||||
toTypeAndTypeGroup.add(relation.getTypeGroup());
|
||||
toTypeAndTypeGroup.add(EntitySearchDirection.TO.name());
|
||||
cache.evict(toTypeAndTypeGroup);
|
||||
}
|
||||
|
||||
// @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #typeGroup, 'FROM'}")
|
||||
@Transactional(propagation = Propagation.SUPPORTS)
|
||||
@Override
|
||||
public List<EntityRelation> findByFrom(TenantId tenantId, EntityId from, RelationTypeGroup typeGroup) {
|
||||
validate(from);
|
||||
validateTypeGroup(typeGroup);
|
||||
log.trace("[{}] Find by from: [{}][{}]: ", tenantId, from, typeGroup, new RuntimeException());
|
||||
return relationDao.findAllByFrom(tenantId, from, typeGroup);
|
||||
RelationCacheKey cacheKey = RelationCacheKey.builder().from(from).typeGroup(typeGroup).direction(EntitySearchDirection.FROM).build();
|
||||
return cache.getAndPutInTransaction(cacheKey,
|
||||
() -> relationDao.findAllByFrom(tenantId, from, typeGroup),
|
||||
RelationCacheValue::getRelations,
|
||||
relations -> RelationCacheValue.builder().relations(relations).build(), false);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -381,7 +324,7 @@ public class BaseRelationService implements RelationService {
|
||||
}, MoreExecutors.directExecutor());
|
||||
}
|
||||
|
||||
// @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup, 'FROM'}")
|
||||
// @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup, 'FROM'}")
|
||||
@Override
|
||||
public List<EntityRelation> findByFromAndType(TenantId tenantId, EntityId from, String relationType, RelationTypeGroup typeGroup) {
|
||||
try {
|
||||
|
||||
@ -0,0 +1,23 @@
|
||||
package org.thingsboard.server.dao.relation;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
public class EntityRelationEvent {
|
||||
@Getter
|
||||
private final EntityId from;
|
||||
@Getter
|
||||
private final EntityId to;
|
||||
@Getter
|
||||
private final String type;
|
||||
@Getter
|
||||
private final RelationTypeGroup typeGroup;
|
||||
|
||||
public static EntityRelationEvent from(EntityRelation relation) {
|
||||
return new EntityRelationEvent(relation.getFrom(), relation.getTo(), relation.getType(), relation.getTypeGroup());
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,70 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.relation;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
|
||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
@Builder
|
||||
public class RelationCacheKey implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 3911151843961657570L;
|
||||
|
||||
private final EntityId from;
|
||||
private final EntityId to;
|
||||
private final String type;
|
||||
private final RelationTypeGroup typeGroup;
|
||||
private final EntitySearchDirection direction;
|
||||
|
||||
public RelationCacheKey(EntityId from, EntityId to, String type, RelationTypeGroup typeGroup) {
|
||||
this(from, to, type, typeGroup, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
boolean first = true;
|
||||
first = addElement(sb, first, from);
|
||||
first = addElement(sb, first, to);
|
||||
first = addElement(sb, first, typeGroup);
|
||||
first = addElement(sb, first, type);
|
||||
first = addElement(sb, first, direction);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private boolean addElement(StringBuilder sb, boolean first, Object element) {
|
||||
if (element != null) {
|
||||
if (!first) {
|
||||
sb.append("_");
|
||||
}
|
||||
sb.append(element);
|
||||
return false;
|
||||
} else {
|
||||
return first;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,40 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.relation;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
@Builder
|
||||
public class RelationCacheValue implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 3911151843961657570L;
|
||||
|
||||
private final EntityRelation relation;
|
||||
private final List<EntityRelation> relations;
|
||||
|
||||
}
|
||||
@ -0,0 +1,34 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.relation;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.cache.CacheManager;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.CacheConstants;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.dao.attributes.AttributeCacheKey;
|
||||
import org.thingsboard.server.dao.cache.CaffeineTbTransactionalCache;
|
||||
|
||||
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true)
|
||||
@Service("RelationCache")
|
||||
public class RelationCaffeineCache extends CaffeineTbTransactionalCache<RelationCacheKey, RelationCacheValue> {
|
||||
|
||||
public RelationCaffeineCache(CacheManager cacheManager) {
|
||||
super(cacheManager, CacheConstants.RELATIONS_CACHE);
|
||||
}
|
||||
|
||||
}
|
||||
@ -43,7 +43,7 @@ public interface RelationDao {
|
||||
|
||||
ListenableFuture<Boolean> checkRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup);
|
||||
|
||||
ListenableFuture<EntityRelation> getRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup);
|
||||
EntityRelation getRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup);
|
||||
|
||||
boolean saveRelation(TenantId tenantId, EntityRelation relation);
|
||||
|
||||
|
||||
@ -107,9 +107,9 @@ public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService imple
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<EntityRelation> getRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
|
||||
public EntityRelation getRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
|
||||
RelationCompositeKey key = getRelationCompositeKey(from, to, relationType, typeGroup);
|
||||
return service.submit(() -> DaoUtil.getData(relationRepository.findById(key)));
|
||||
return DaoUtil.getData(relationRepository.findById(key));
|
||||
}
|
||||
|
||||
private RelationCompositeKey getRelationCompositeKey(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user