Merge pull request #9678 from thingsboard/feature/serialization-stats
[WIP] FST Stats
This commit is contained in:
		
						commit
						a45b3ab362
					
				@ -17,6 +17,7 @@ package org.thingsboard.server.cache;
 | 
			
		||||
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.cache.support.NullValue;
 | 
			
		||||
import org.springframework.data.redis.connection.RedisConnection;
 | 
			
		||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
 | 
			
		||||
@ -27,6 +28,7 @@ import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
 | 
			
		||||
import org.springframework.data.redis.core.types.Expiration;
 | 
			
		||||
import org.springframework.data.redis.serializer.RedisSerializer;
 | 
			
		||||
import org.springframework.data.redis.serializer.StringRedisSerializer;
 | 
			
		||||
import org.thingsboard.server.common.data.FstStatsService;
 | 
			
		||||
import redis.clients.jedis.Jedis;
 | 
			
		||||
import redis.clients.jedis.JedisPool;
 | 
			
		||||
import redis.clients.jedis.util.JedisClusterCRC16;
 | 
			
		||||
@ -44,6 +46,9 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
 | 
			
		||||
    private static final byte[] BINARY_NULL_VALUE = RedisSerializer.java().serialize(NullValue.INSTANCE);
 | 
			
		||||
    static final JedisPool MOCK_POOL = new JedisPool(); //non-null pool required for JedisConnection to trigger closing jedis connection
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private FstStatsService fstStatsService;
 | 
			
		||||
 | 
			
		||||
    @Getter
 | 
			
		||||
    private final String cacheName;
 | 
			
		||||
    private final JedisConnectionFactory connectionFactory;
 | 
			
		||||
@ -80,6 +85,9 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
 | 
			
		||||
                return SimpleTbCacheValueWrapper.empty();
 | 
			
		||||
            } else {
 | 
			
		||||
                V value = valueSerializer.deserialize(key, rawValue);
 | 
			
		||||
                if (value != null) {
 | 
			
		||||
                    fstStatsService.incrementDecode(value.getClass());
 | 
			
		||||
                }
 | 
			
		||||
                return SimpleTbCacheValueWrapper.wrap(value);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
@ -190,7 +198,9 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
 | 
			
		||||
            return BINARY_NULL_VALUE;
 | 
			
		||||
        } else {
 | 
			
		||||
            try {
 | 
			
		||||
                return valueSerializer.serialize(value);
 | 
			
		||||
                var bytes = valueSerializer.serialize(value);
 | 
			
		||||
                fstStatsService.incrementEncode(value.getClass());
 | 
			
		||||
                return bytes;
 | 
			
		||||
            } catch (Exception e) {
 | 
			
		||||
                log.warn("Failed to serialize the cache value: {}", value, e);
 | 
			
		||||
                throw new RuntimeException(e);
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,24 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.data;
 | 
			
		||||
 | 
			
		||||
public interface FstStatsService {
 | 
			
		||||
 | 
			
		||||
    void incrementEncode(Class<?> clazz);
 | 
			
		||||
 | 
			
		||||
    void incrementDecode(Class<?> clazz);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -17,8 +17,10 @@ package org.thingsboard.server.queue.util;
 | 
			
		||||
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.nustaq.serialization.FSTConfiguration;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.common.data.FSTUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.FstStatsService;
 | 
			
		||||
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
 | 
			
		||||
@ -26,12 +28,17 @@ import java.util.Optional;
 | 
			
		||||
@Service
 | 
			
		||||
public class ProtoWithFSTService implements DataDecodingEncodingService {
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private FstStatsService fstStatsService;
 | 
			
		||||
 | 
			
		||||
    public static final FSTConfiguration CONFIG = FSTConfiguration.createDefaultConfiguration();
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public <T> Optional<T> decode(byte[] byteArray) {
 | 
			
		||||
        try {
 | 
			
		||||
            return Optional.ofNullable(FSTUtils.decode(byteArray));
 | 
			
		||||
            Optional<T> optional = Optional.ofNullable(FSTUtils.decode(byteArray));
 | 
			
		||||
            optional.ifPresent(obj -> fstStatsService.incrementDecode(obj.getClass()));
 | 
			
		||||
            return optional;
 | 
			
		||||
        } catch (IllegalArgumentException e) {
 | 
			
		||||
            log.error("Error during deserialization message, [{}]", e.getMessage());
 | 
			
		||||
            return Optional.empty();
 | 
			
		||||
@ -41,7 +48,9 @@ public class ProtoWithFSTService implements DataDecodingEncodingService {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public <T> byte[] encode(T msq) {
 | 
			
		||||
        return FSTUtils.encode(msq);
 | 
			
		||||
        var bytes = FSTUtils.encode(msq);
 | 
			
		||||
        fstStatsService.incrementEncode(msq.getClass());
 | 
			
		||||
        return bytes;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,42 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.stats;
 | 
			
		||||
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.common.data.FstStatsService;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
 | 
			
		||||
@Service
 | 
			
		||||
public class FstStatsServiceImpl implements FstStatsService {
 | 
			
		||||
    private final ConcurrentHashMap<String, StatsCounter> encodeCounters = new ConcurrentHashMap<>();
 | 
			
		||||
    private final ConcurrentHashMap<String, StatsCounter> decodeCounters = new ConcurrentHashMap<>();
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private StatsFactory statsFactory;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void incrementEncode(Class<?> clazz) {
 | 
			
		||||
        encodeCounters.computeIfAbsent(clazz.getSimpleName(), key -> statsFactory.createStatsCounter("fstEncode", key)).increment();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void incrementDecode(Class<?> clazz) {
 | 
			
		||||
        decodeCounters.computeIfAbsent(clazz.getSimpleName(), key -> statsFactory.createStatsCounter("fstDecode", key)).increment();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user