diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 0b28aca7b0..b0830317a7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -884,10 +884,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { return; } log.debug("[{}] Restoring sessions from cache", deviceId); - DeviceSessionsCacheEntry sessionsDump = null; + DeviceSessionsCacheEntry sessionsDump; try { - sessionsDump = DeviceSessionsCacheEntry.parseFrom(systemContext.getDeviceSessionCacheService().get(deviceId)); - } catch (InvalidProtocolBufferException e) { + sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId); + } catch (Exception e) { log.warn("[{}] Failed to decode device sessions from cache", deviceId); return; } @@ -942,7 +942,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { }); systemContext.getDeviceSessionCacheService() .put(deviceId, DeviceSessionsCacheEntry.newBuilder() - .addAllSessions(sessionsList).build().toByteArray()); + .addAllSessions(sessionsList).build()); } void init(TbActorCtx ctx) { diff --git a/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java index bf8af38e5f..9e5667aa19 100644 --- a/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java +++ b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java @@ -15,14 +15,18 @@ */ package org.thingsboard.server.service.session; +import com.google.protobuf.InvalidProtocolBufferException; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.CachePut; import org.springframework.cache.annotation.Cacheable; import org.springframework.stereotype.Service; +import org.thingsboard.server.cache.TbTransactionalCache; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry; import org.thingsboard.server.queue.util.TbCoreComponent; +import java.io.Serializable; import java.util.Collections; import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE; @@ -35,17 +39,20 @@ import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE; @Slf4j public class DefaultDeviceSessionCacheService implements DeviceSessionCacheService { + @Autowired + protected TbTransactionalCache cache; + @Override - @Cacheable(cacheNames = SESSIONS_CACHE, key = "#deviceId.toString()") - public byte[] get(DeviceId deviceId) { + public DeviceSessionsCacheEntry get(DeviceId deviceId) { log.debug("[{}] Fetching session data from cache", deviceId); - return DeviceSessionsCacheEntry.newBuilder().addAllSessions(Collections.emptyList()).build().toByteArray(); + return cache.getAndPutInTransaction(deviceId, () -> + DeviceSessionsCacheEntry.newBuilder().addAllSessions(Collections.emptyList()).build(), false); } @Override - @CachePut(cacheNames = SESSIONS_CACHE, key = "#deviceId.toString()") - public byte[] put(DeviceId deviceId, byte[] sessions) { + public DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions) { log.debug("[{}] Pushing session data to cache: {}", deviceId, sessions); + cache.putIfAbsent(deviceId, sessions); return sessions; } } diff --git a/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java b/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java index 158666048c..b01f94306a 100644 --- a/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java +++ b/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java @@ -23,8 +23,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheE */ public interface DeviceSessionCacheService { - byte[] get(DeviceId deviceId); + DeviceSessionsCacheEntry get(DeviceId deviceId); - byte[] put(DeviceId deviceId, byte[] sessions); + DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions); } diff --git a/application/src/main/java/org/thingsboard/server/service/session/SessionCaffeineCache.java b/application/src/main/java/org/thingsboard/server/service/session/SessionCaffeineCache.java new file mode 100644 index 0000000000..f8177c22db --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/session/SessionCaffeineCache.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.session; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cache.CacheManager; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.CacheConstants; +import org.thingsboard.server.common.data.DeviceInfo; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.dao.asset.AssetCacheKey; +import org.thingsboard.server.dao.cache.CaffeineTbTransactionalCache; +import org.thingsboard.server.gen.transport.TransportProtos; + +@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true) +@Service("SessionCache") +public class SessionCaffeineCache extends CaffeineTbTransactionalCache { + + public SessionCaffeineCache(CacheManager cacheManager) { + super(cacheManager, CacheConstants.SESSIONS_CACHE); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/session/SessionRedisCache.java b/application/src/main/java/org/thingsboard/server/service/session/SessionRedisCache.java new file mode 100644 index 0000000000..210410fe8e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/session/SessionRedisCache.java @@ -0,0 +1,55 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.session; + +import com.google.protobuf.InvalidProtocolBufferException; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.data.redis.serializer.SerializationException; +import org.springframework.stereotype.Service; +import org.thingsboard.server.cache.CacheSpecsMap; +import org.thingsboard.server.cache.TBRedisCacheConfiguration; +import org.thingsboard.server.common.data.CacheConstants; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.dao.asset.AssetCacheKey; +import org.thingsboard.server.dao.cache.RedisTbTransactionalCache; +import org.thingsboard.server.dao.cache.TbRedisSerializer; +import org.thingsboard.server.gen.transport.TransportProtos; + +@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") +@Service("SessionCache") +public class SessionRedisCache extends RedisTbTransactionalCache { + + public SessionRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) { + super(CacheConstants.ASSET_CACHE, cacheSpecsMap, connectionFactory, configuration, new RedisSerializer<>() { + @Override + public byte[] serialize(TransportProtos.DeviceSessionsCacheEntry deviceSessionsCacheEntry) throws SerializationException { + return deviceSessionsCacheEntry.toByteArray(); + } + + @Override + public TransportProtos.DeviceSessionsCacheEntry deserialize(byte[] bytes) throws SerializationException { + try { + return TransportProtos.DeviceSessionsCacheEntry.parseFrom(bytes); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Failed to deserialize session cache entry"); + } + } + }); + } +} diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java index cc43f0c537..92ab0489c7 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java @@ -27,6 +27,8 @@ public interface TbTransactionalCache get(K key); + void put(K key, V value); + void putIfAbsent(K key, V value); void evict(K key); diff --git a/dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbTransactionalCache.java b/dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbTransactionalCache.java index 59e5b52f28..d9fb5cd774 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbTransactionalCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cache/CaffeineTbTransactionalCache.java @@ -50,6 +50,17 @@ public abstract class CaffeineTbTransactionalCache