Merge pull request #12365 from AndriiLandiak/fix/related-edges-cache
Improve related edges cache cleanup
This commit is contained in:
		
						commit
						87d5a28cb6
					
				@ -600,7 +600,7 @@ cache:
 | 
				
			|||||||
      maxSize: "${CACHE_SPECS_EDGE_SESSIONS_MAX_SIZE:10000}" # 0 means the cache is disabled
 | 
					      maxSize: "${CACHE_SPECS_EDGE_SESSIONS_MAX_SIZE:10000}" # 0 means the cache is disabled
 | 
				
			||||||
    relatedEdges:
 | 
					    relatedEdges:
 | 
				
			||||||
      timeToLiveInMinutes: "${CACHE_SPECS_RELATED_EDGES_TTL:1440}" # Related Edges cache TTL
 | 
					      timeToLiveInMinutes: "${CACHE_SPECS_RELATED_EDGES_TTL:1440}" # Related Edges cache TTL
 | 
				
			||||||
      maxSize: "${CACHE_SPECS_RELATED_EDGES_MAX_SIZE:0}" # 0 means the cache is disabled
 | 
					      maxSize: "${CACHE_SPECS_RELATED_EDGES_MAX_SIZE:10000}" # 0 means the cache is disabled
 | 
				
			||||||
    repositorySettings:
 | 
					    repositorySettings:
 | 
				
			||||||
      timeToLiveInMinutes: "${CACHE_SPECS_REPOSITORY_SETTINGS_TTL:1440}" # Repository settings cache TTL
 | 
					      timeToLiveInMinutes: "${CACHE_SPECS_REPOSITORY_SETTINGS_TTL:1440}" # Repository settings cache TTL
 | 
				
			||||||
      maxSize: "${CACHE_SPECS_REPOSITORY_SETTINGS_MAX_SIZE:10000}" # 0 means the cache is disabled
 | 
					      maxSize: "${CACHE_SPECS_REPOSITORY_SETTINGS_MAX_SIZE:10000}" # 0 means the cache is disabled
 | 
				
			||||||
 | 
				
			|||||||
@ -19,9 +19,11 @@ import lombok.Getter;
 | 
				
			|||||||
import lombok.extern.slf4j.Slf4j;
 | 
					import lombok.extern.slf4j.Slf4j;
 | 
				
			||||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
					import org.springframework.beans.factory.annotation.Autowired;
 | 
				
			||||||
import org.springframework.cache.support.NullValue;
 | 
					import org.springframework.cache.support.NullValue;
 | 
				
			||||||
 | 
					import org.springframework.dao.InvalidDataAccessApiUsageException;
 | 
				
			||||||
import org.springframework.data.redis.connection.RedisConnection;
 | 
					import org.springframework.data.redis.connection.RedisConnection;
 | 
				
			||||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
 | 
					import org.springframework.data.redis.connection.RedisConnectionFactory;
 | 
				
			||||||
import org.springframework.data.redis.connection.RedisStringCommands;
 | 
					import org.springframework.data.redis.connection.RedisStringCommands;
 | 
				
			||||||
 | 
					import org.springframework.data.redis.connection.ReturnType;
 | 
				
			||||||
import org.springframework.data.redis.connection.jedis.JedisClusterConnection;
 | 
					import org.springframework.data.redis.connection.jedis.JedisClusterConnection;
 | 
				
			||||||
import org.springframework.data.redis.connection.jedis.JedisConnection;
 | 
					import org.springframework.data.redis.connection.jedis.JedisConnection;
 | 
				
			||||||
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
 | 
					import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
 | 
				
			||||||
@ -269,4 +271,24 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
 | 
				
			|||||||
        connection.stringCommands().set(rawKey, rawValue, this.cacheTtl, setOption);
 | 
					        connection.stringCommands().set(rawKey, rawValue, this.cacheTtl, setOption);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    protected void executeScript(RedisConnection connection, byte[] scriptSha, byte[] luaScript, ReturnType returnType, int numKeys, byte[]... keysAndArgs) {
 | 
				
			||||||
 | 
					        try {
 | 
				
			||||||
 | 
					            connection.scriptingCommands().evalSha(scriptSha, returnType, numKeys, keysAndArgs);
 | 
				
			||||||
 | 
					        } catch (InvalidDataAccessApiUsageException ignored) {
 | 
				
			||||||
 | 
					            log.debug("Loading LUA with expected SHA [{}], connection [{}]", new String(scriptSha), connection.getNativeConnection());
 | 
				
			||||||
 | 
					            String actualSha = connection.scriptingCommands().scriptLoad(luaScript);
 | 
				
			||||||
 | 
					            if (!Arrays.equals(scriptSha, StringRedisSerializer.UTF_8.serialize(actualSha))) {
 | 
				
			||||||
 | 
					                String message = String.format("SHA for LUA script wrong! Expected [%s], but actual [%s], connection [%s]",
 | 
				
			||||||
 | 
					                        new String(scriptSha), actualSha, connection.getNativeConnection());
 | 
				
			||||||
 | 
					                throw new IllegalStateException(message);
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            try {
 | 
				
			||||||
 | 
					                connection.scriptingCommands().evalSha(scriptSha, returnType, numKeys, keysAndArgs);
 | 
				
			||||||
 | 
					            } catch (InvalidDataAccessApiUsageException exception) {
 | 
				
			||||||
 | 
					                log.warn("Slowly executing eval instead of fast evalSha", exception);
 | 
				
			||||||
 | 
					                connection.scriptingCommands().eval(luaScript, returnType, numKeys, keysAndArgs);
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -15,10 +15,8 @@
 | 
				
			|||||||
 */
 | 
					 */
 | 
				
			||||||
package org.thingsboard.server.cache;
 | 
					package org.thingsboard.server.cache;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import jakarta.annotation.PostConstruct;
 | 
					 | 
				
			||||||
import lombok.extern.slf4j.Slf4j;
 | 
					import lombok.extern.slf4j.Slf4j;
 | 
				
			||||||
import org.apache.commons.lang3.NotImplementedException;
 | 
					import org.apache.commons.lang3.NotImplementedException;
 | 
				
			||||||
import org.springframework.dao.InvalidDataAccessApiUsageException;
 | 
					 | 
				
			||||||
import org.springframework.data.redis.connection.RedisConnection;
 | 
					import org.springframework.data.redis.connection.RedisConnection;
 | 
				
			||||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
 | 
					import org.springframework.data.redis.connection.RedisConnectionFactory;
 | 
				
			||||||
import org.springframework.data.redis.connection.ReturnType;
 | 
					import org.springframework.data.redis.connection.ReturnType;
 | 
				
			||||||
@ -27,7 +25,6 @@ import org.springframework.data.redis.serializer.StringRedisSerializer;
 | 
				
			|||||||
import org.thingsboard.server.common.data.HasVersion;
 | 
					import org.thingsboard.server.common.data.HasVersion;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import java.io.Serializable;
 | 
					import java.io.Serializable;
 | 
				
			||||||
import java.util.Arrays;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
@Slf4j
 | 
					@Slf4j
 | 
				
			||||||
public abstract class VersionedRedisTbCache<K extends VersionedCacheKey, V extends Serializable & HasVersion> extends RedisTbTransactionalCache<K, V> implements VersionedTbCache<K, V> {
 | 
					public abstract class VersionedRedisTbCache<K extends VersionedCacheKey, V extends Serializable & HasVersion> extends RedisTbTransactionalCache<K, V> implements VersionedTbCache<K, V> {
 | 
				
			||||||
@ -65,19 +62,6 @@ public abstract class VersionedRedisTbCache<K extends VersionedCacheKey, V exten
 | 
				
			|||||||
        super(cacheName, cacheSpecsMap, connectionFactory, configuration, valueSerializer);
 | 
					        super(cacheName, cacheSpecsMap, connectionFactory, configuration, valueSerializer);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @PostConstruct
 | 
					 | 
				
			||||||
    public void init() {
 | 
					 | 
				
			||||||
        try (var connection = getConnection(SET_VERSIONED_VALUE_SHA)) {
 | 
					 | 
				
			||||||
            log.debug("Loading LUA with expected SHA[{}], connection [{}]", new String(SET_VERSIONED_VALUE_SHA), connection.getNativeConnection());
 | 
					 | 
				
			||||||
            String sha = connection.scriptingCommands().scriptLoad(SET_VERSIONED_VALUE_LUA_SCRIPT);
 | 
					 | 
				
			||||||
            if (!Arrays.equals(SET_VERSIONED_VALUE_SHA, StringRedisSerializer.UTF_8.serialize(sha))) {
 | 
					 | 
				
			||||||
                log.error("SHA for SET_VERSIONED_VALUE_LUA_SCRIPT wrong! Expected [{}], but actual [{}], connection [{}]", new String(SET_VERSIONED_VALUE_SHA), sha, connection.getNativeConnection());
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
        } catch (Throwable t) {
 | 
					 | 
				
			||||||
            log.error("Error on Redis versioned cache init", t);
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    protected byte[] doGet(K key, RedisConnection connection) {
 | 
					    protected byte[] doGet(K key, RedisConnection connection) {
 | 
				
			||||||
        if (!key.isVersioned()) {
 | 
					        if (!key.isVersioned()) {
 | 
				
			||||||
@ -129,21 +113,7 @@ public abstract class VersionedRedisTbCache<K extends VersionedCacheKey, V exten
 | 
				
			|||||||
        byte[] rawValue = getRawValue(value);
 | 
					        byte[] rawValue = getRawValue(value);
 | 
				
			||||||
        byte[] rawVersion = StringRedisSerializer.UTF_8.serialize(String.valueOf(version));
 | 
					        byte[] rawVersion = StringRedisSerializer.UTF_8.serialize(String.valueOf(version));
 | 
				
			||||||
        byte[] rawExpiration = StringRedisSerializer.UTF_8.serialize(String.valueOf(expiration.getExpirationTimeInSeconds()));
 | 
					        byte[] rawExpiration = StringRedisSerializer.UTF_8.serialize(String.valueOf(expiration.getExpirationTimeInSeconds()));
 | 
				
			||||||
        try {
 | 
					        executeScript(connection, SET_VERSIONED_VALUE_SHA, SET_VERSIONED_VALUE_LUA_SCRIPT, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion, rawExpiration);
 | 
				
			||||||
            connection.scriptingCommands().evalSha(SET_VERSIONED_VALUE_SHA, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion, rawExpiration);
 | 
					 | 
				
			||||||
        } catch (InvalidDataAccessApiUsageException e) {
 | 
					 | 
				
			||||||
            log.debug("loading LUA [{}]", connection.getNativeConnection());
 | 
					 | 
				
			||||||
            String sha = connection.scriptingCommands().scriptLoad(SET_VERSIONED_VALUE_LUA_SCRIPT);
 | 
					 | 
				
			||||||
            if (!Arrays.equals(SET_VERSIONED_VALUE_SHA, StringRedisSerializer.UTF_8.serialize(sha))) {
 | 
					 | 
				
			||||||
                log.error("SHA for SET_VERSIONED_VALUE_LUA_SCRIPT wrong! Expected [{}], but actual [{}]", new String(SET_VERSIONED_VALUE_SHA), sha);
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
            try {
 | 
					 | 
				
			||||||
                connection.scriptingCommands().evalSha(SET_VERSIONED_VALUE_SHA, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion, rawExpiration);
 | 
					 | 
				
			||||||
            } catch (InvalidDataAccessApiUsageException ignored) {
 | 
					 | 
				
			||||||
                log.debug("Slowly executing eval instead of fast evalsha");
 | 
					 | 
				
			||||||
                connection.scriptingCommands().eval(SET_VERSIONED_VALUE_LUA_SCRIPT, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion, rawExpiration);
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
 | 
				
			|||||||
@ -39,7 +39,7 @@ public class RelatedEdgesCacheKey implements Serializable {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public String toString() {
 | 
					    public String toString() {
 | 
				
			||||||
        return tenantId + "_" + entityId;
 | 
					        return "{" + tenantId + "}" + entityId;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user