FirmwareCache refactoring

This commit is contained in:
YevhenBondarenko 2021-04-28 11:29:04 +03:00
parent 1850024fdb
commit aee3fd3b6e
13 changed files with 64 additions and 169 deletions

View File

@ -23,11 +23,10 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.cache.firmware.FirmwareCacheWriter; import org.thingsboard.server.cache.firmware.FirmwareDataCache;
import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
@ -61,7 +60,6 @@ import org.thingsboard.server.dao.device.provision.ProvisionRequest;
import org.thingsboard.server.dao.device.provision.ProvisionResponse; import org.thingsboard.server.dao.device.provision.ProvisionResponse;
import org.thingsboard.server.dao.firmware.FirmwareService; import org.thingsboard.server.dao.firmware.FirmwareService;
import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.resource.ResourceService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
@ -116,7 +114,7 @@ public class DefaultTransportApiService implements TransportApiService {
private final DeviceProvisionService deviceProvisionService; private final DeviceProvisionService deviceProvisionService;
private final TbResourceService resourceService; private final TbResourceService resourceService;
private final FirmwareService firmwareService; private final FirmwareService firmwareService;
private final FirmwareCacheWriter firmwareCacheWriter; private final FirmwareDataCache firmwareDataCache;
private final ConcurrentMap<String, ReentrantLock> deviceCreationLocks = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ReentrantLock> deviceCreationLocks = new ConcurrentHashMap<>();
@ -125,7 +123,7 @@ public class DefaultTransportApiService implements TransportApiService {
RelationService relationService, DeviceCredentialsService deviceCredentialsService, RelationService relationService, DeviceCredentialsService deviceCredentialsService,
DeviceStateService deviceStateService, DbCallbackExecutorService dbCallbackExecutorService, DeviceStateService deviceStateService, DbCallbackExecutorService dbCallbackExecutorService,
TbClusterService tbClusterService, DataDecodingEncodingService dataDecodingEncodingService, TbClusterService tbClusterService, DataDecodingEncodingService dataDecodingEncodingService,
DeviceProvisionService deviceProvisionService, TbResourceService resourceService, FirmwareService firmwareService, FirmwareCacheWriter firmwareCacheWriter) { DeviceProvisionService deviceProvisionService, TbResourceService resourceService, FirmwareService firmwareService, FirmwareDataCache firmwareDataCache) {
this.deviceProfileCache = deviceProfileCache; this.deviceProfileCache = deviceProfileCache;
this.tenantProfileCache = tenantProfileCache; this.tenantProfileCache = tenantProfileCache;
this.apiUsageStateService = apiUsageStateService; this.apiUsageStateService = apiUsageStateService;
@ -139,7 +137,7 @@ public class DefaultTransportApiService implements TransportApiService {
this.deviceProvisionService = deviceProvisionService; this.deviceProvisionService = deviceProvisionService;
this.resourceService = resourceService; this.resourceService = resourceService;
this.firmwareService = firmwareService; this.firmwareService = firmwareService;
this.firmwareCacheWriter = firmwareCacheWriter; this.firmwareDataCache = firmwareDataCache;
} }
@Override @Override
@ -485,7 +483,7 @@ public class DefaultTransportApiService implements TransportApiService {
builder.setVersion(firmware.getVersion()); builder.setVersion(firmware.getVersion());
builder.setFileName(firmware.getFileName()); builder.setFileName(firmware.getFileName());
builder.setContentType(firmware.getContentType()); builder.setContentType(firmware.getContentType());
firmwareCacheWriter.put(firmwareId.toString(), firmware.getData().array()); firmwareDataCache.put(firmwareId.toString(), firmware.getData().array());
} }
} }

View File

@ -1,33 +0,0 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.cache.firmware;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import static org.thingsboard.server.common.data.CacheConstants.FIRMWARE_CACHE;
public abstract class AbstractRedisFirmwareCache {
protected final RedisConnectionFactory redisConnectionFactory;
protected AbstractRedisFirmwareCache(RedisConnectionFactory redisConnectionFactory) {
this.redisConnectionFactory = redisConnectionFactory;
}
protected byte[] toFirmwareCacheKey(String key) {
return String.format("%s::%s", FIRMWARE_CACHE, key).getBytes();
}
}

View File

@ -15,22 +15,20 @@
*/ */
package org.thingsboard.server.cache.firmware; package org.thingsboard.server.cache.firmware;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.CacheManager; import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import static org.thingsboard.server.common.data.CacheConstants.FIRMWARE_CACHE; import static org.thingsboard.server.common.data.CacheConstants.FIRMWARE_CACHE;
@Service @Service
@ConditionalOnExpression("(('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport') && ('${cache.type:null}'=='caffeine' || '${cache.type:null}'=='null')") @ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true)
public class CaffeineFirmwareCacheReader implements FirmwareCacheReader { @RequiredArgsConstructor
public class CaffeineFirmwareCache implements FirmwareDataCache {
private final CacheManager cacheManager; private final CacheManager cacheManager;
public CaffeineFirmwareCacheReader(CacheManager cacheManager) {
this.cacheManager = cacheManager;
}
@Override @Override
public byte[] get(String key) { public byte[] get(String key) {
return get(key, 0, 0); return get(key, 0, 0);
@ -57,4 +55,14 @@ public class CaffeineFirmwareCacheReader implements FirmwareCacheReader {
} }
return new byte[0]; return new byte[0];
} }
@Override
public void put(String key, byte[] value) {
cacheManager.getCache(FIRMWARE_CACHE).putIfAbsent(key, value);
}
@Override
public void evict(String key) {
cacheManager.getCache(FIRMWARE_CACHE).evict(key);
}
} }

View File

@ -1,38 +0,0 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.cache.firmware;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service;
import static org.thingsboard.server.common.data.CacheConstants.FIRMWARE_CACHE;
@Service
@ConditionalOnExpression("('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core') && ('${cache.type:null}'=='caffeine' || '${cache.type:null}'=='null')")
public class CaffeineFirmwareCacheWriter implements FirmwareCacheWriter {
private final CacheManager cacheManager;
public CaffeineFirmwareCacheWriter(CacheManager cacheManager) {
this.cacheManager = cacheManager;
}
@Override
public void put(String key, byte[] value) {
cacheManager.getCache(FIRMWARE_CACHE).putIfAbsent(key, value);
}
}

View File

@ -1,20 +0,0 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.cache.firmware;
public interface FirmwareCacheWriter {
void put(String key, byte[] value);
}

View File

@ -15,8 +15,13 @@
*/ */
package org.thingsboard.server.cache.firmware; package org.thingsboard.server.cache.firmware;
public interface FirmwareCacheReader { public interface FirmwareDataCache {
byte[] get(String key); byte[] get(String key);
byte[] get(String key, int chunkSize, int chunk); byte[] get(String key, int chunkSize, int chunk);
void put(String key, byte[] value);
void evict(String key);
} }

View File

@ -1,38 +0,0 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.cache.firmware;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.stereotype.Service;
@Service
@ConditionalOnExpression("('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core') && '${cache.type:null}'=='redis'")
public class RedisFirmwareCacheWriter extends AbstractRedisFirmwareCache implements FirmwareCacheWriter {
public RedisFirmwareCacheWriter(RedisConnectionFactory redisConnectionFactory) {
super(redisConnectionFactory);
}
@Override
public void put(String key, byte[] value) {
try (RedisConnection connection = redisConnectionFactory.getConnection()) {
connection.set(toFirmwareCacheKey(key), value);
}
}
}

View File

@ -15,18 +15,20 @@
*/ */
package org.thingsboard.server.cache.firmware; package org.thingsboard.server.cache.firmware;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@Service import static org.thingsboard.server.common.data.CacheConstants.FIRMWARE_CACHE;
@ConditionalOnExpression("(('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport') && '${cache.type:null}'=='redis'")
public class RedisFirmwareCacheReader extends AbstractRedisFirmwareCache implements FirmwareCacheReader {
public RedisFirmwareCacheReader(RedisConnectionFactory redisConnectionFactory) { @Service
super(redisConnectionFactory); @ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
} @RequiredArgsConstructor
public class RedisFirmwareDataCache implements FirmwareDataCache {
private final RedisConnectionFactory redisConnectionFactory;
@Override @Override
public byte[] get(String key) { public byte[] get(String key) {
@ -46,4 +48,21 @@ public class RedisFirmwareCacheReader extends AbstractRedisFirmwareCache impleme
} }
} }
@Override
public void put(String key, byte[] value) {
try (RedisConnection connection = redisConnectionFactory.getConnection()) {
connection.set(toFirmwareCacheKey(key), value);
}
}
@Override
public void evict(String key) {
try (RedisConnection connection = redisConnectionFactory.getConnection()) {
connection.del(toFirmwareCacheKey(key));
}
}
private byte[] toFirmwareCacheKey(String key) {
return String.format("%s::%s", FIRMWARE_CACHE, key).getBytes();
}
} }

View File

@ -452,7 +452,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
String strChunk = exchange.getQueryParameter("chunk"); String strChunk = exchange.getQueryParameter("chunk");
int chunkSize = StringUtils.isEmpty(strChunkSize) ? 0 : Integer.parseInt(strChunkSize); int chunkSize = StringUtils.isEmpty(strChunkSize) ? 0 : Integer.parseInt(strChunkSize);
int chunk = StringUtils.isEmpty(strChunk) ? 0 : Integer.parseInt(strChunk); int chunk = StringUtils.isEmpty(strChunk) ? 0 : Integer.parseInt(strChunk);
exchange.respond(CoAP.ResponseCode.CONTENT, transportContext.getFirmwareCacheReader().get(firmwareId, chunkSize, chunk)); exchange.respond(CoAP.ResponseCode.CONTENT, transportContext.getFirmwareDataCache().get(firmwareId, chunkSize, chunk));
} else { } else {
exchange.respond(CoAP.ResponseCode.BAD_REQUEST); exchange.respond(CoAP.ResponseCode.BAD_REQUEST);
} }

View File

@ -299,7 +299,7 @@ public class DeviceApiController {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_FOUND)); responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_FOUND));
} else if (title.equals(firmwareResponseMsg.getTitle()) && version.equals(firmwareResponseMsg.getVersion())) { } else if (title.equals(firmwareResponseMsg.getTitle()) && version.equals(firmwareResponseMsg.getVersion())) {
String firmwareId = new UUID(firmwareResponseMsg.getFirmwareIdMSB(), firmwareResponseMsg.getFirmwareIdLSB()).toString(); String firmwareId = new UUID(firmwareResponseMsg.getFirmwareIdMSB(), firmwareResponseMsg.getFirmwareIdLSB()).toString();
ByteArrayResource resource = new ByteArrayResource(transportContext.getFirmwareCacheReader().get(firmwareId, chuckSize, chuck)); ByteArrayResource resource = new ByteArrayResource(transportContext.getFirmwareDataCache().get(firmwareId, chuckSize, chuck));
ResponseEntity<ByteArrayResource> response = ResponseEntity.ok() ResponseEntity<ByteArrayResource> response = ResponseEntity.ok()
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=" + firmwareResponseMsg.getFileName()) .header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=" + firmwareResponseMsg.getFileName())
.header("x-filename", firmwareResponseMsg.getFileName()) .header("x-filename", firmwareResponseMsg.getFileName())

View File

@ -452,7 +452,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
log.trace("[{}] Send firmware [{}] to device!", sessionId, firmwareId); log.trace("[{}] Send firmware [{}] to device!", sessionId, firmwareId);
ack(ctx, msgId); ack(ctx, msgId);
try { try {
byte[] firmwareChunk = context.getFirmwareCacheReader().get(firmwareId, chunkSize, chunk); byte[] firmwareChunk = context.getFirmwareDataCache().get(firmwareId, chunkSize, chunk);
deviceSessionCtx.getPayloadAdaptor() deviceSessionCtx.getPayloadAdaptor()
.convertToPublish(deviceSessionCtx, firmwareChunk, requestId, chunk) .convertToPublish(deviceSessionCtx, firmwareChunk, requestId, chunk)
.ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); .ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);

View File

@ -20,7 +20,7 @@ import lombok.Data;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.cache.firmware.FirmwareCacheReader; import org.thingsboard.server.cache.firmware.FirmwareDataCache;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.queue.scheduler.SchedulerComponent;
@ -53,7 +53,7 @@ public abstract class TransportContext {
@Getter @Getter
@Autowired @Autowired
private FirmwareCacheReader firmwareCacheReader; private FirmwareDataCache firmwareDataCache;
@PostConstruct @PostConstruct
public void init() { public void init() {

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.dao.firmware;
import com.google.common.hash.HashFunction; import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.hibernate.exception.ConstraintViolationException; import org.hibernate.exception.ConstraintViolationException;
@ -24,6 +25,7 @@ import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager; import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.Cacheable; import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.server.cache.firmware.FirmwareDataCache;
import org.thingsboard.server.common.data.Firmware; import org.thingsboard.server.common.data.Firmware;
import org.thingsboard.server.common.data.FirmwareInfo; import org.thingsboard.server.common.data.FirmwareInfo;
import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.Tenant;
@ -47,6 +49,7 @@ import static org.thingsboard.server.dao.service.Validator.validatePageLink;
@Service @Service
@Slf4j @Slf4j
@RequiredArgsConstructor
public class BaseFirmwareService implements FirmwareService { public class BaseFirmwareService implements FirmwareService {
public static final String INCORRECT_FIRMWARE_ID = "Incorrect firmwareId "; public static final String INCORRECT_FIRMWARE_ID = "Incorrect firmwareId ";
public static final String INCORRECT_TENANT_ID = "Incorrect tenantId "; public static final String INCORRECT_TENANT_ID = "Incorrect tenantId ";
@ -55,13 +58,7 @@ public class BaseFirmwareService implements FirmwareService {
private final FirmwareDao firmwareDao; private final FirmwareDao firmwareDao;
private final FirmwareInfoDao firmwareInfoDao; private final FirmwareInfoDao firmwareInfoDao;
private final CacheManager cacheManager; private final CacheManager cacheManager;
private final FirmwareDataCache firmwareDataCache;
public BaseFirmwareService(TenantDao tenantDao, FirmwareDao firmwareDao, FirmwareInfoDao firmwareInfoDao, CacheManager cacheManager) {
this.tenantDao = tenantDao;
this.firmwareDao = firmwareDao;
this.firmwareInfoDao = firmwareInfoDao;
this.cacheManager = cacheManager;
}
@Override @Override
public FirmwareInfo saveFirmwareInfo(FirmwareInfo firmwareInfo) { public FirmwareInfo saveFirmwareInfo(FirmwareInfo firmwareInfo) {
@ -72,7 +69,7 @@ public class BaseFirmwareService implements FirmwareService {
if (firmwareId != null) { if (firmwareId != null) {
Cache cache = cacheManager.getCache(FIRMWARE_CACHE); Cache cache = cacheManager.getCache(FIRMWARE_CACHE);
cache.evict(toFirmwareInfoKey(firmwareId)); cache.evict(toFirmwareInfoKey(firmwareId));
cache.evict(toFirmwareDataKey(firmwareId)); firmwareDataCache.evict(firmwareId.toString());
} }
return firmwareInfoDao.save(firmwareInfo.getTenantId(), firmwareInfo); return firmwareInfoDao.save(firmwareInfo.getTenantId(), firmwareInfo);
} catch (Exception t) { } catch (Exception t) {
@ -94,7 +91,7 @@ public class BaseFirmwareService implements FirmwareService {
if (firmwareId != null) { if (firmwareId != null) {
Cache cache = cacheManager.getCache(FIRMWARE_CACHE); Cache cache = cacheManager.getCache(FIRMWARE_CACHE);
cache.evict(toFirmwareInfoKey(firmwareId)); cache.evict(toFirmwareInfoKey(firmwareId));
cache.evict(toFirmwareDataKey(firmwareId)); firmwareDataCache.evict(firmwareId.toString());
} }
return firmwareDao.save(firmware.getTenantId(), firmware); return firmwareDao.save(firmware.getTenantId(), firmware);
} catch (Exception t) { } catch (Exception t) {
@ -145,7 +142,7 @@ public class BaseFirmwareService implements FirmwareService {
try { try {
Cache cache = cacheManager.getCache(FIRMWARE_CACHE); Cache cache = cacheManager.getCache(FIRMWARE_CACHE);
cache.evict(toFirmwareInfoKey(firmwareId)); cache.evict(toFirmwareInfoKey(firmwareId));
cache.evict(toFirmwareDataKey(firmwareId)); firmwareDataCache.evict(firmwareId.toString());
firmwareDao.removeById(tenantId, firmwareId.getId()); firmwareDao.removeById(tenantId, firmwareId.getId());
} catch (Exception t) { } catch (Exception t) {
ConstraintViolationException e = extractConstraintViolationException(t).orElse(null); ConstraintViolationException e = extractConstraintViolationException(t).orElse(null);
@ -329,7 +326,4 @@ public class BaseFirmwareService implements FirmwareService {
return Collections.singletonList(firmwareId); return Collections.singletonList(firmwareId);
} }
private static String toFirmwareDataKey(FirmwareId firmwareId) {
return firmwareId.toString();
}
} }