From e9bf5bae29318d6b0a63dd104d365a522b24508d Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Mon, 19 Oct 2020 11:50:06 +0300 Subject: [PATCH] Rate limit improvements --- .../server/common/msg/tools/TbRateLimits.java | 4 + .../queue/util/TbTransportComponent.java | 26 +++++++ .../common/transport/TransportService.java | 3 + .../DefaultTransportRateLimitService.java | 74 +++++++++++-------- .../limits/DummyTransportRateLimit.java | 5 ++ .../limits/SimpleTransportRateLimit.java | 4 + .../transport/limits/TransportRateLimit.java | 2 + .../limits/TransportRateLimitService.java | 4 +- .../limits/TransportRateLimitType.java | 24 ++++-- .../DefaultTransportDeviceProfileCache.java | 3 +- .../service/DefaultTransportService.java | 44 +++++------ .../DefaultTransportTenantProfileCache.java | 3 +- 12 files changed, 136 insertions(+), 60 deletions(-) create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/util/TbTransportComponent.java diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimits.java b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimits.java index 3c79895c8b..3550bc44a9 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimits.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/tools/TbRateLimits.java @@ -50,4 +50,8 @@ public class TbRateLimits { return bucket.tryConsume(1); } + public boolean tryConsume(long number) { + return bucket.tryConsume(number); + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/util/TbTransportComponent.java b/common/queue/src/main/java/org/thingsboard/server/queue/util/TbTransportComponent.java new file mode 100644 index 0000000000..dde1c6a620 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/util/TbTransportComponent.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.util; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +@Retention(RetentionPolicy.RUNTIME) +@ConditionalOnExpression("('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'") +public @interface TbTransportComponent { +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index 9a2fd75d47..8f57db3897 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; +import org.thingsboard.server.common.transport.limits.TransportRateLimitType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; @@ -69,6 +70,8 @@ public interface TransportService { boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback callback); + boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback callback, int dataPoints, TransportRateLimitType... limits); + void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback callback); void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback callback); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitService.java index 2bf2f6a697..569f5446db 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DefaultTransportRateLimitService.java @@ -16,7 +16,6 @@ package org.thingsboard.server.common.transport.limits; import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.TenantProfileData; @@ -24,12 +23,13 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.transport.TransportTenantProfileCache; import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult; +import org.thingsboard.server.queue.util.TbTransportComponent; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @Service -@ConditionalOnExpression("('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'") +@TbTransportComponent @Slf4j public class DefaultTransportRateLimitService implements TransportRateLimitService { @@ -45,23 +45,21 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi } @Override - public TransportRateLimit getRateLimit(TenantId tenantId, TransportRateLimitType limitType) { - TransportRateLimit[] limits = perTenantLimits.get(tenantId); - if (limits == null) { - limits = fetchProfileAndInit(tenantId); - perTenantLimits.put(tenantId, limits); + public TransportRateLimitType checkLimits(TenantId tenantId, DeviceId deviceId, int dataPoints, TransportRateLimitType... limits) { + TransportRateLimit[] tenantLimits = getTenantRateLimits(tenantId); + TransportRateLimit[] deviceLimits = getDeviceRateLimits(tenantId, deviceId); + for (TransportRateLimitType limitType : limits) { + TransportRateLimit rateLimit; + if (limitType.isTenantLevel()) { + rateLimit = tenantLimits[limitType.ordinal()]; + } else { + rateLimit = deviceLimits[limitType.ordinal()]; + } + if (!rateLimit.tryConsume(limitType.isMessageLevel() ? 1L : dataPoints)) { + return limitType; + } } - return limits[limitType.ordinal()]; - } - - @Override - public TransportRateLimit getRateLimit(TenantId tenantId, DeviceId deviceId, TransportRateLimitType limitType) { - TransportRateLimit[] limits = perDeviceLimits.get(deviceId); - if (limits == null) { - limits = fetchProfileAndInit(tenantId); - perDeviceLimits.put(deviceId, limits); - } - return limits[limitType.ordinal()]; + return null; } @Override @@ -77,7 +75,17 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi mergeLimits(tenantId, fetchProfileAndInit(tenantId)); } - public void mergeLimits(TenantId tenantId, TransportRateLimit[] newRateLimits) { + @Override + public void remove(TenantId tenantId) { + perTenantLimits.remove(tenantId); + } + + @Override + public void remove(DeviceId deviceId) { + perDeviceLimits.remove(deviceId); + } + + private void mergeLimits(TenantId tenantId, TransportRateLimit[] newRateLimits) { TransportRateLimit[] oldRateLimits = perTenantLimits.get(tenantId); if (oldRateLimits == null) { perTenantLimits.put(tenantId, newRateLimits); @@ -92,16 +100,6 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi } } - @Override - public void remove(TenantId tenantId) { - perTenantLimits.remove(tenantId); - } - - @Override - public void remove(DeviceId deviceId) { - perDeviceLimits.remove(deviceId); - } - private TransportRateLimit[] fetchProfileAndInit(TenantId tenantId) { return perTenantLimits.computeIfAbsent(tenantId, tmp -> createTransportRateLimits(tenantProfileCache.get(tenantId))); } @@ -114,4 +112,22 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi } return rateLimits; } + + private TransportRateLimit[] getTenantRateLimits(TenantId tenantId) { + TransportRateLimit[] limits = perTenantLimits.get(tenantId); + if (limits == null) { + limits = fetchProfileAndInit(tenantId); + perTenantLimits.put(tenantId, limits); + } + return limits; + } + + private TransportRateLimit[] getDeviceRateLimits(TenantId tenantId, DeviceId deviceId) { + TransportRateLimit[] limits = perDeviceLimits.get(deviceId); + if (limits == null) { + limits = fetchProfileAndInit(tenantId); + perDeviceLimits.put(deviceId, limits); + } + return limits; + } } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DummyTransportRateLimit.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DummyTransportRateLimit.java index d6d58d55ba..a93487632f 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DummyTransportRateLimit.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/DummyTransportRateLimit.java @@ -22,6 +22,11 @@ public class DummyTransportRateLimit implements TransportRateLimit { return ""; } + @Override + public boolean tryConsume(long number) { + return true; + } + @Override public boolean tryConsume() { return true; diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/SimpleTransportRateLimit.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/SimpleTransportRateLimit.java index 08fbb7ec5d..3253272ded 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/SimpleTransportRateLimit.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/SimpleTransportRateLimit.java @@ -31,4 +31,8 @@ public class SimpleTransportRateLimit implements TransportRateLimit { return rateLimit.tryConsume(); } + @Override + public boolean tryConsume(long number) { + return number <= 0 || rateLimit.tryConsume(number); + } } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimit.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimit.java index 0901e1becc..a2eea81d3a 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimit.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimit.java @@ -21,4 +21,6 @@ public interface TransportRateLimit { boolean tryConsume(); + boolean tryConsume(long number); + } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitService.java index 2f2e808a36..a97fbfc61d 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitService.java @@ -21,9 +21,7 @@ import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult public interface TransportRateLimitService { - TransportRateLimit getRateLimit(TenantId tenantId, TransportRateLimitType limit); - - TransportRateLimit getRateLimit(TenantId tenantId, DeviceId deviceId, TransportRateLimitType limit); + TransportRateLimitType checkLimits(TenantId tenantId, DeviceId deviceId, int dataPoints, TransportRateLimitType... limits); void update(TenantProfileUpdateResult update); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitType.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitType.java index becc5fbc86..a3e6da6683 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitType.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitType.java @@ -19,15 +19,29 @@ import lombok.Getter; public enum TransportRateLimitType { - TENANT_MAX_MSGS("transport.tenant.max.msg"), - TENANT_MAX_DATA_POINTS("transport.tenant.max.dataPoints"), - DEVICE_MAX_MSGS("transport.device.max.msg"), - DEVICE_MAX_DATA_POINTS("transport.device.max.dataPoints"); + TENANT_MAX_MSGS("transport.tenant.msg", true, true), + TENANT_TELEMETRY_MSGS("transport.tenant.telemetry", true, true), + TENANT_MAX_DATA_POINTS("transport.tenant.dataPoints", true, false), + DEVICE_MAX_MSGS("transport.device.msg", false, true), + DEVICE_TELEMETRY_MSGS("transport.device.telemetry", false, true), + DEVICE_MAX_DATA_POINTS("transport.device.dataPoints", false, false); @Getter private final String configurationKey; + @Getter + private final boolean tenantLevel; + @Getter + private final boolean deviceLevel; + @Getter + private final boolean messageLevel; + @Getter + private final boolean dataPointLevel; - TransportRateLimitType(String configurationKey) { + TransportRateLimitType(String configurationKey, boolean tenantLevel, boolean messageLevel) { this.configurationKey = configurationKey; + this.tenantLevel = tenantLevel; + this.deviceLevel = !tenantLevel; + this.messageLevel = messageLevel; + this.dataPointLevel = !messageLevel; } } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportDeviceProfileCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportDeviceProfileCache.java index 3b44fd6a54..b12aab1a8c 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportDeviceProfileCache.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportDeviceProfileCache.java @@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.transport.TransportDeviceProfileCache; import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; +import org.thingsboard.server.queue.util.TbTransportComponent; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -30,7 +31,7 @@ import java.util.concurrent.ConcurrentMap; @Slf4j @Component -@ConditionalOnExpression("('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'") +@TbTransportComponent public class DefaultTransportDeviceProfileCache implements TransportDeviceProfileCache { private final ConcurrentMap deviceProfiles = new ConcurrentHashMap<>(); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 7631658602..3f37685383 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -79,6 +79,7 @@ import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.provider.TbTransportQueueFactory; +import org.thingsboard.server.queue.util.TbTransportComponent; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -103,7 +104,7 @@ import java.util.concurrent.atomic.AtomicInteger; */ @Slf4j @Service -@ConditionalOnExpression("('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'") +@TbTransportComponent public class DefaultTransportService implements TransportService { @Value("${transport.sessions.inactivity_timeout}") @@ -363,7 +364,11 @@ public class DefaultTransportService implements TransportService { @Override public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, msg, callback)) { + int dataPoints = 0; + for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { + dataPoints += tsKv.getKvCount(); + } + if (checkLimits(sessionInfo, msg, callback, dataPoints, TELEMETRY)) { reportActivityInternal(sessionInfo); TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); @@ -384,7 +389,7 @@ public class DefaultTransportService implements TransportService { @Override public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, msg, callback)) { + if (checkLimits(sessionInfo, msg, callback, msg.getKvCount(), TELEMETRY)) { reportActivityInternal(sessionInfo); TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); @@ -574,37 +579,34 @@ public class DefaultTransportService implements TransportService { sessions.remove(toSessionId(sessionInfo)); } + private TransportRateLimitType[] DEFAULT = new TransportRateLimitType[]{TransportRateLimitType.TENANT_MAX_MSGS, TransportRateLimitType.DEVICE_MAX_MSGS}; + private TransportRateLimitType[] TELEMETRY = TransportRateLimitType.values(); + @Override public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback callback) { + return checkLimits(sessionInfo, msg, callback, 0, DEFAULT); + } + + @Override + public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback callback, int dataPoints, TransportRateLimitType... limits) { if (log.isTraceEnabled()) { log.trace("[{}] Processing msg: {}", toSessionId(sessionInfo), msg); } TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); - - TransportRateLimit tenantRateLimit = rateLimitService.getRateLimit(tenantId, TransportRateLimitType.TENANT_MAX_MSGS); - - if (!tenantRateLimit.tryConsume()) { - if (callback != null) { - callback.onError(new TbRateLimitsException(EntityType.TENANT)); - } - if (log.isTraceEnabled()) { - log.trace("[{}][{}] Tenant level rate limit detected: {}", toSessionId(sessionInfo), tenantId, msg); - } - return false; - } DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); - TransportRateLimit deviceRateLimit = rateLimitService.getRateLimit(tenantId, deviceId, TransportRateLimitType.DEVICE_MAX_MSGS); - if (!deviceRateLimit.tryConsume()) { + + TransportRateLimitType limit = rateLimitService.checkLimits(tenantId, deviceId, 0, limits); + if (limit == null) { + return true; + } else { if (callback != null) { - callback.onError(new TbRateLimitsException(EntityType.DEVICE)); + callback.onError(new TbRateLimitsException(limit.isTenantLevel() ? EntityType.TENANT : EntityType.DEVICE)); } if (log.isTraceEnabled()) { - log.trace("[{}][{}] Device level rate limit detected: {}", toSessionId(sessionInfo), deviceId, msg); + log.trace("[{}][{}] {} rateLimit detected: {}", toSessionId(sessionInfo), tenantId, limit, msg); } return false; } - - return true; } protected void processToTransportMsg(TransportProtos.ToTransportMsg toSessionMsg) { diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java index 717627a60e..784f7b3ae8 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java @@ -33,6 +33,7 @@ import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.TenantRoutingInfo; import org.thingsboard.server.queue.discovery.TenantRoutingInfoService; +import org.thingsboard.server.queue.util.TbTransportComponent; import java.util.Collections; import java.util.Optional; @@ -43,7 +44,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @Component -@ConditionalOnExpression("('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'") +@TbTransportComponent @Slf4j public class DefaultTransportTenantProfileCache implements TransportTenantProfileCache {