Implement cache for the entity creation request

This commit is contained in:
Andrii Shvaika 2023-11-15 15:57:58 +02:00
parent afa54ef277
commit a4bac1e794
17 changed files with 251 additions and 25 deletions

View File

@ -78,6 +78,7 @@ import org.thingsboard.server.dao.device.provision.ProvisionFailedException;
import org.thingsboard.server.dao.device.provision.ProvisionRequest;
import org.thingsboard.server.dao.device.provision.ProvisionResponse;
import org.thingsboard.server.dao.device.provision.ProvisionResponseStatus;
import org.thingsboard.server.dao.exception.EntitiesLimitException;
import org.thingsboard.server.dao.ota.OtaPackageService;
import org.thingsboard.server.dao.queue.QueueService;
import org.thingsboard.server.dao.relation.RelationService;
@ -164,6 +165,7 @@ public class DefaultTransportApiService implements TransportApiService {
public void init() {
handlerExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(maxCoreHandlerThreads, "transport-api-service-core-handler"));
}
@PreDestroy
public void destroy() {
if (handlerExecutor != null) {
@ -400,6 +402,13 @@ public class DefaultTransportApiService implements TransportApiService {
} catch (JsonProcessingException e) {
log.warn("[{}] Failed to lookup device by gateway id and name: [{}]", gatewayId, requestMsg.getDeviceName(), e);
throw new RuntimeException(e);
} catch (EntitiesLimitException e) {
log.warn("[{}][{}] API limit exception: [{}]", e.getTenantId(), gatewayId, e.getMessage());
return TransportApiResponseMsg.newBuilder()
.setGetOrCreateDeviceResponseMsg(
GetOrCreateDeviceFromGatewayResponseMsg.newBuilder()
.setError(TransportProtos.TransportApiRequestErrorCode.ENTITY_LIMIT))
.build();
} finally {
deviceCreationLock.unlock();
}

View File

@ -583,6 +583,9 @@ cache:
rateLimits:
timeToLiveInMinutes: "${CACHE_SPECS_RATE_LIMITS_TTL:120}" # Rate limits cache TTL
maxSize: "${CACHE_SPECS_RATE_LIMITS_MAX_SIZE:200000}" # 0 means the cache is disabled
entityLimits:
timeToLiveInMinutes: "${CACHE_SPECS_ENTITY_LIMITS_TTL:5}" # Entity limits cache TTL
maxSize: "${CACHE_SPECS_ENTITY_LIMITS_MAX_SIZE:100000}" # 0 means the cache is disabled
# Spring data parameters
spring.data.redis.repositories.enabled: false # Disable this because it is not required.
@ -875,7 +878,7 @@ transport:
ip_limits_enabled: "${TB_TRANSPORT_IP_RATE_LIMITS_ENABLED:false}"
# Maximum number of connect attempts with invalid credentials
max_wrong_credentials_per_ip: "${TB_TRANSPORT_MAX_WRONG_CREDENTIALS_PER_IP:10}"
# Timeout to expire block IP addresses
# Timeout (in milliseconds) to expire block IP addresses
ip_block_timeout: "${TB_TRANSPORT_IP_BLOCK_TIMEOUT:60000}"
# Local HTTP transport parameters
http:

View File

@ -255,6 +255,12 @@ message GetOrCreateDeviceFromGatewayRequestMsg {
message GetOrCreateDeviceFromGatewayResponseMsg {
DeviceInfoProto deviceInfo = 1;
bytes profileBody = 2;
TransportApiRequestErrorCode error = 3;
}
enum TransportApiRequestErrorCode {
UNKNOWN_TRANSPORT_API_ERROR = 0;
ENTITY_LIMIT = 1;
}
message GetEntityProfileRequestMsg {

View File

@ -49,6 +49,7 @@ public class DataConstants {
public static final String MQTT_TRANSPORT_NAME = "MQTT";
public static final String HTTP_TRANSPORT_NAME = "HTTP";
public static final String SNMP_TRANSPORT_NAME = "SNMP";
public static final String MAXIMUM_NUMBER_OF_DEVICES_REACHED = "Maximum number of devices reached!";
public static final String[] allScopes() {

View File

@ -35,6 +35,7 @@ import io.netty.handler.codec.mqtt.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.transport.TransportService;
@ -215,8 +216,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
@Override
public void onFailure(Throwable t) {
log.warn("[{}][{}][{}] Failed to process device connect command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
logDeviceCreationError(t, deviceName);
}
}, context.getExecutor());
}
@ -248,7 +248,8 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
return future;
}
try {
transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
transportService.process(gateway.getTenantId(),
GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
.setDeviceName(deviceName)
.setDeviceType(deviceType)
.setGatewayIdMSB(gateway.getDeviceId().getId().getMostSignificantBits())
@ -274,9 +275,9 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
}
@Override
public void onError(Throwable e) {
log.warn("[{}][{}][{}] Failed to process device connect command at getDeviceCreationFuture: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, e);
futureToSet.setException(e);
public void onError(Throwable t) {
logDeviceCreationError(t, deviceName);
futureToSet.setException(t);
deviceFutures.remove(deviceName);
}
});
@ -287,6 +288,15 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
}
}
private void logDeviceCreationError(Throwable t, String deviceName) {
if (DataConstants.MAXIMUM_NUMBER_OF_DEVICES_REACHED.equals(t.getMessage())) {
log.info("[{}][{}][{}] Failed to process device connect command: [{}] due to [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName,
DataConstants.MAXIMUM_NUMBER_OF_DEVICES_REACHED);
} else {
log.warn("[{}][{}][{}] Failed to process device connect command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
}
}
protected abstract T newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg);
protected int getMsgId(MqttPublishMessage mqttMsg) {

View File

@ -98,7 +98,7 @@ public interface TransportService {
void process(ValidateDeviceLwM2MCredentialsRequestMsg msg,
TransportServiceCallback<ValidateDeviceCredentialsResponse> callback);
void process(GetOrCreateDeviceFromGatewayRequestMsg msg,
void process(TenantId tenantId, GetOrCreateDeviceFromGatewayRequestMsg msg,
TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse> callback);
void process(ProvisionDeviceRequestMsg msg,

View File

@ -0,0 +1,74 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.limits;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.queue.util.TbTransportComponent;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@Service
@TbTransportComponent
@Slf4j
public class DefaultEntityLimitsCache implements EntityLimitsCache {
private static final int DEVIATION = 10;
private final Cache<EntityLimitKey, Boolean> cache;
public DefaultEntityLimitsCache(@Value("${cache.entityLimits.timeToLiveInMinutes:5}") int ttl,
@Value("${cache.entityLimits.maxSize:100000}") int maxSize) {
// We use the 'random' expiration time to avoid peak loads.
long mainPart = (TimeUnit.MINUTES.toNanos(ttl) / 100) * (100 - DEVIATION);
long randomPart = (TimeUnit.MINUTES.toNanos(ttl) / 100) * DEVIATION;
cache = Caffeine.newBuilder()
.expireAfter(new Expiry<EntityLimitKey, Boolean>() {
@Override
public long expireAfterCreate(@NotNull EntityLimitKey key, @NotNull Boolean value, long currentTime) {
return mainPart + (long) (randomPart * ThreadLocalRandom.current().nextDouble());
}
@Override
public long expireAfterUpdate(@NotNull EntityLimitKey key, @NotNull Boolean value, long currentTime, long currentDuration) {
return currentDuration;
}
@Override
public long expireAfterRead(@NotNull EntityLimitKey key, @NotNull Boolean value, long currentTime, long currentDuration) {
return currentDuration;
}
})
.maximumSize(maxSize)
.build();
}
@Override
public boolean get(EntityLimitKey key) {
var result = cache.getIfPresent(key);
return result != null ? result : false;
}
@Override
public void put(EntityLimitKey key, boolean value) {
cache.put(key, value);
}
}

View File

@ -0,0 +1,27 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.limits;
import lombok.Data;
import org.thingsboard.server.common.data.id.TenantId;
@Data
public class EntityLimitKey {
private final TenantId tenantId;
private final String deviceName;
}

View File

@ -0,0 +1,24 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.limits;
public interface EntityLimitsCache {
boolean get(EntityLimitKey key);
void put(EntityLimitKey key, boolean value);
}

View File

@ -76,6 +76,8 @@ import org.thingsboard.server.common.transport.TransportTenantProfileCache;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.common.transport.limits.EntityLimitKey;
import org.thingsboard.server.common.transport.limits.EntityLimitsCache;
import org.thingsboard.server.common.transport.limits.TransportRateLimitService;
import org.thingsboard.server.common.transport.util.JsonUtils;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -161,6 +163,7 @@ public class DefaultTransportService implements TransportService {
@Value("${transport.stats.enabled:false}")
private boolean statsEnabled;
@Autowired
@Lazy
private TbApiUsageReportClient apiUsageClient;
@ -184,6 +187,8 @@ public class DefaultTransportService implements TransportService {
private final TransportResourceCache transportResourceCache;
private final NotificationRuleProcessor notificationRuleProcessor;
private final EntityLimitsCache entityLimitsCache;
protected TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> transportApiRequestTemplate;
protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;
protected TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> tbCoreMsgProducer;
@ -212,7 +217,8 @@ public class DefaultTransportService implements TransportService {
TransportTenantProfileCache tenantProfileCache,
TransportRateLimitService rateLimitService,
DataDecodingEncodingService dataDecodingEncodingService, SchedulerComponent scheduler, TransportResourceCache transportResourceCache,
ApplicationEventPublisher eventPublisher, NotificationRuleProcessor notificationRuleProcessor) {
ApplicationEventPublisher eventPublisher, NotificationRuleProcessor notificationRuleProcessor,
EntityLimitsCache entityLimitsCache) {
this.partitionService = partitionService;
this.serviceInfoProvider = serviceInfoProvider;
this.queueProvider = queueProvider;
@ -227,6 +233,7 @@ public class DefaultTransportService implements TransportService {
this.transportResourceCache = transportResourceCache;
this.eventPublisher = eventPublisher;
this.notificationRuleProcessor = notificationRuleProcessor;
this.entityLimitsCache = entityLimitsCache;
}
@PostConstruct
@ -249,7 +256,7 @@ public class DefaultTransportService implements TransportService {
}
@AfterStartUp(order = AfterStartUp.TRANSPORT_SERVICE)
private void start() {
public void start() {
mainConsumerExecutor.execute(() -> {
while (!stopped) {
try {
@ -473,24 +480,33 @@ public class DefaultTransportService implements TransportService {
AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor);
}
@Override
public void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg requestMsg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse> callback) {
public void process(TenantId tenantId, TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg requestMsg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse> callback) {
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(requestMsg).build());
log.trace("Processing msg: {}", requestMsg);
ListenableFuture<GetOrCreateDeviceFromGatewayResponse> response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> {
TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg msg = tmp.getValue().getGetOrCreateDeviceResponseMsg();
GetOrCreateDeviceFromGatewayResponse.GetOrCreateDeviceFromGatewayResponseBuilder result = GetOrCreateDeviceFromGatewayResponse.builder();
if (msg.hasDeviceInfo()) {
TransportDeviceInfo tdi = getTransportDeviceInfo(msg.getDeviceInfo());
result.deviceInfo(tdi);
ByteString profileBody = msg.getProfileBody();
if (profileBody != null && !profileBody.isEmpty()) {
result.deviceProfile(deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody));
var key = new EntityLimitKey(tenantId, StringUtils.truncate(requestMsg.getDeviceName(), 256));
if (entityLimitsCache.get(key)) {
transportCallbackExecutor.submit(() -> callback.onError(new RuntimeException(DataConstants.MAXIMUM_NUMBER_OF_DEVICES_REACHED)));
} else {
ListenableFuture<GetOrCreateDeviceFromGatewayResponse> response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> {
TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg msg = tmp.getValue().getGetOrCreateDeviceResponseMsg();
GetOrCreateDeviceFromGatewayResponse.GetOrCreateDeviceFromGatewayResponseBuilder result = GetOrCreateDeviceFromGatewayResponse.builder();
if (msg.hasDeviceInfo()) {
TransportDeviceInfo tdi = getTransportDeviceInfo(msg.getDeviceInfo());
result.deviceInfo(tdi);
ByteString profileBody = msg.getProfileBody();
if (!profileBody.isEmpty()) {
result.deviceProfile(deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody));
}
} else if (TransportProtos.TransportApiRequestErrorCode.ENTITY_LIMIT.equals(msg.getError())) {
entityLimitsCache.put(key, true);
throw new RuntimeException(DataConstants.MAXIMUM_NUMBER_OF_DEVICES_REACHED);
}
}
return result.build();
}, MoreExecutors.directExecutor());
AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor);
return result.build();
}, MoreExecutors.directExecutor());
AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor);
}
}
@Override

View File

@ -0,0 +1,35 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.exception;
import lombok.Getter;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.TenantId;
public class EntitiesLimitException extends DataValidationException {
private static final long serialVersionUID = -9211462514373279196L;
@Getter
private final TenantId tenantId;
@Getter
private final EntityType entityType;
public EntitiesLimitException(TenantId tenantId, EntityType entityType) {
super(entityType.getNormalName() + "s limit reached");
this.tenantId = tenantId;
this.entityType = entityType;
}
}

View File

@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.TenantEntityWithDataDao;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.exception.EntitiesLimitException;
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
import java.util.HashSet;
@ -117,7 +118,7 @@ public abstract class DataValidator<D extends BaseData<?>> {
protected void validateNumberOfEntitiesPerTenant(TenantId tenantId,
EntityType entityType) {
if (!apiLimitService.checkEntitiesLimit(tenantId, entityType)) {
throw new DataValidationException(entityType.getNormalName() + "s limit reached");
throw new EntitiesLimitException(tenantId, entityType);
}
}

View File

@ -49,6 +49,10 @@ zk:
cache:
# caffeine or redis
type: "${CACHE_TYPE:redis}"
# Deliberately placed outside the 'specs' group above
entityLimits:
timeToLiveInMinutes: "${CACHE_SPECS_ENTITY_LIMITS_TTL:5}" # Entity limits cache TTL
maxSize: "${CACHE_SPECS_ENTITY_LIMITS_MAX_SIZE:100000}" # 0 means the cache is disabled
# Redis configuration parameters
redis:

View File

@ -79,6 +79,10 @@ zk:
cache:
# caffeine or redis
type: "${CACHE_TYPE:redis}"
# Deliberately placed outside the 'specs' group above
entityLimits:
timeToLiveInMinutes: "${CACHE_SPECS_ENTITY_LIMITS_TTL:5}" # Entity limits cache TTL
maxSize: "${CACHE_SPECS_ENTITY_LIMITS_MAX_SIZE:100000}" # 0 means the cache is disabled
# Redis configuration parameters
redis:

View File

@ -49,6 +49,10 @@ zk:
cache:
# caffeine or redis
type: "${CACHE_TYPE:redis}"
# Deliberately placed outside the 'specs' group above
entityLimits:
timeToLiveInMinutes: "${CACHE_SPECS_ENTITY_LIMITS_TTL:5}" # Entity limits cache TTL
maxSize: "${CACHE_SPECS_ENTITY_LIMITS_MAX_SIZE:100000}" # 0 means the cache is disabled
# Redis configuration parameters
redis:

View File

@ -49,6 +49,10 @@ zk:
cache:
# caffeine or redis
type: "${CACHE_TYPE:redis}"
# Deliberately placed outside the 'specs' group above
entityLimits:
timeToLiveInMinutes: "${CACHE_SPECS_ENTITY_LIMITS_TTL:5}" # Entity limits cache TTL
maxSize: "${CACHE_SPECS_ENTITY_LIMITS_MAX_SIZE:100000}" # 0 means the cache is disabled
# Redis configuration parameters
redis:

View File

@ -49,6 +49,10 @@ zk:
cache:
# caffeine or redis
type: "${CACHE_TYPE:redis}"
# Deliberately placed outside the 'specs' group above
entityLimits:
timeToLiveInMinutes: "${CACHE_SPECS_ENTITY_LIMITS_TTL:5}" # Entity limits cache TTL
maxSize: "${CACHE_SPECS_ENTITY_LIMITS_MAX_SIZE:100000}" # 0 means the cache is disabled
# Redis configuration parameters
redis: