Merge pull request #10919 from YevhenBondarenko/feature/gateway-device-rate-limits

added rate limits for the gateway device
This commit is contained in:
Viacheslav Klimov 2024-06-05 09:39:04 +03:00 committed by GitHub
commit 9706ec0e13
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 228 additions and 50 deletions

View File

@ -141,6 +141,9 @@ public class TenantProfileController extends BaseController {
" \"transportGatewayMsgRateLimit\": \"20:1,600:60\",\n" +
" \"transportGatewayTelemetryMsgRateLimit\": \"20:1,600:60\",\n" +
" \"transportGatewayTelemetryDataPointsRateLimit\": \"20:1,600:60\",\n" +
" \"transportGatewayDeviceMsgRateLimit\": \"20:1,600:60\",\n" +
" \"transportGatewayDeviceTelemetryMsgRateLimit\": \"20:1,600:60\",\n" +
" \"transportGatewayDeviceTelemetryDataPointsRateLimit\": \"20:1,600:60\",\n" +
" \"maxTransportMessages\": 10000000,\n" +
" \"maxTransportDataPoints\": 10000000,\n" +
" \"maxREExecutions\": 4000000,\n" +

View File

@ -105,18 +105,36 @@ public class DefaultDataUpdateService implements DataUpdateService {
var configurationOpt = tenantProfile.getProfileConfiguration();
configurationOpt.ifPresent(configuration -> {
boolean updated = false;
if (configuration.getTransportDeviceMsgRateLimit() != null && configuration.getTransportGatewayMsgRateLimit() == null) {
if (configuration.getTransportDeviceMsgRateLimit() != null) {
if (configuration.getTransportGatewayMsgRateLimit() == null) {
configuration.setTransportGatewayMsgRateLimit(configuration.getTransportDeviceMsgRateLimit());
updated = true;
}
if (configuration.getTransportDeviceTelemetryMsgRateLimit() != null && configuration.getTransportGatewayTelemetryMsgRateLimit() == null) {
if (configuration.getTransportGatewayDeviceMsgRateLimit() == null) {
configuration.setTransportGatewayDeviceMsgRateLimit(configuration.getTransportDeviceMsgRateLimit());
updated = true;
}
}
if (configuration.getTransportDeviceTelemetryMsgRateLimit() != null) {
if (configuration.getTransportGatewayTelemetryMsgRateLimit() == null) {
configuration.setTransportGatewayTelemetryMsgRateLimit(configuration.getTransportDeviceTelemetryMsgRateLimit());
updated = true;
}
if (configuration.getTransportDeviceTelemetryDataPointsRateLimit() != null && configuration.getTransportGatewayTelemetryDataPointsRateLimit() == null) {
if (configuration.getTransportGatewayDeviceTelemetryMsgRateLimit() == null) {
configuration.setTransportGatewayDeviceTelemetryMsgRateLimit(configuration.getTransportDeviceTelemetryMsgRateLimit());
updated = true;
}
}
if (configuration.getTransportDeviceTelemetryDataPointsRateLimit() != null) {
if (configuration.getTransportGatewayTelemetryDataPointsRateLimit() == null) {
configuration.setTransportGatewayTelemetryDataPointsRateLimit(configuration.getTransportDeviceTelemetryDataPointsRateLimit());
updated = true;
}
if (configuration.getTransportGatewayDeviceTelemetryDataPointsRateLimit() == null) {
configuration.setTransportGatewayDeviceTelemetryDataPointsRateLimit(configuration.getTransportDeviceTelemetryDataPointsRateLimit());
updated = true;
}
}
if (updated) {
try {
tenantProfileService.saveTenantProfile(TenantId.SYS_TENANT_ID, tenantProfile);

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.transport.mqtt;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -25,7 +26,7 @@ import org.springframework.test.context.TestPropertySource;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.limit.LimitedApi;
import org.thingsboard.server.common.data.notification.rule.trigger.RateLimitsTrigger;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
@ -34,12 +35,14 @@ import org.thingsboard.server.controller.AbstractControllerTest;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.eq;
import static org.thingsboard.server.common.data.limit.LimitedApi.TRANSPORT_MESSAGES_PER_GATEWAY;
import static org.thingsboard.server.common.data.limit.LimitedApi.TRANSPORT_MESSAGES_PER_GATEWAY_DEVICE;
@DaoSqlTest
@TestPropertySource(properties = {
@ -48,11 +51,14 @@ import static org.thingsboard.server.common.data.limit.LimitedApi.TRANSPORT_MESS
})
public class MqttGatewayRateLimitsTest extends AbstractControllerTest {
private static final String TOPIC = "v1/gateway/telemetry";
private static final String GATEWAY_TOPIC = "v1/gateway/telemetry";
private static final String DEVICE_TOPIC = "v1/devices/me/telemetry";
private static final String DEVICE_A = "DeviceA";
private static final String DEVICE_B = "DeviceB";
private DeviceId gatewayId;
private static final String DEVICE_PAYLOAD = "{\"temperature\": 42}";
private Device gateway;
private String gatewayAccessToken;
@SpyBean
@ -71,6 +77,10 @@ public class MqttGatewayRateLimitsTest extends AbstractControllerTest {
profileConfiguration.setTransportGatewayTelemetryMsgRateLimit(null);
profileConfiguration.setTransportGatewayTelemetryDataPointsRateLimit(null);
profileConfiguration.setTransportGatewayDeviceMsgRateLimit(null);
profileConfiguration.setTransportGatewayDeviceTelemetryMsgRateLimit(null);
profileConfiguration.setTransportGatewayDeviceTelemetryDataPointsRateLimit(null);
doPost("/api/tenantProfile", tenantProfile);
loginTenantAdmin();
@ -107,24 +117,24 @@ public class MqttGatewayRateLimitsTest extends AbstractControllerTest {
MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken);
client.publishAndWait(TOPIC, getGatewayPayload(DEVICE_A));
client.publishAndWait(GATEWAY_TOPIC, getGatewayPayload(DEVICE_A));
loginTenantAdmin();
Device deviceA = getDeviceByName(DEVICE_A);
var deviceATrigger = createRateLimitsTrigger(deviceA);
var deviceATrigger = createRateLimitsTrigger(deviceA, TRANSPORT_MESSAGES_PER_GATEWAY);
Mockito.verify(notificationRuleProcessor, Mockito.never()).process(eq(deviceATrigger));
try {
client.publishAndWait(TOPIC, getGatewayPayload(DEVICE_B));
client.publishAndWait(GATEWAY_TOPIC, getGatewayPayload(DEVICE_B));
} catch (Exception t) {
}
Device deviceB = getDeviceByName(DEVICE_B);
var deviceBTrigger = createRateLimitsTrigger(deviceB);
var deviceBTrigger = createRateLimitsTrigger(deviceB, TRANSPORT_MESSAGES_PER_GATEWAY);
Mockito.verify(notificationRuleProcessor, Mockito.times(1)).process(deviceBTrigger);
@ -133,6 +143,62 @@ public class MqttGatewayRateLimitsTest extends AbstractControllerTest {
}
}
@Test
public void transportGatewayDeviceMsgRateLimitTest() throws Exception {
transportGatewayDeviceRateLimitTest(profileConfiguration -> profileConfiguration.setTransportGatewayDeviceMsgRateLimit("3:600"));
}
@Test
public void transportGatewayDeviceTelemetryMsgRateLimitTest() throws Exception {
transportGatewayDeviceRateLimitTest(profileConfiguration -> profileConfiguration.setTransportGatewayDeviceTelemetryMsgRateLimit("1:600"));
}
@Test
public void transportGatewayDeviceTelemetryDataPointsRateLimitTest() throws Exception {
transportGatewayDeviceRateLimitTest(profileConfiguration -> profileConfiguration.setTransportGatewayDeviceTelemetryDataPointsRateLimit("1:600"));
}
private void transportGatewayDeviceRateLimitTest(Consumer<DefaultTenantProfileConfiguration> profileConfiguration) throws Exception {
loginSysAdmin();
TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class);
Assert.assertNotNull(tenantProfile);
profileConfiguration.accept((DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration());
doPost("/api/tenantProfile", tenantProfile);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken);
client.publishAndWait(DEVICE_TOPIC, DEVICE_PAYLOAD.getBytes());
client.disconnect();
var gatewayTrigger = createRateLimitsTrigger(gateway, TRANSPORT_MESSAGES_PER_GATEWAY_DEVICE);
Mockito.verify(notificationRuleProcessor, Mockito.never()).process(eq(gatewayTrigger));
loginTenantAdmin();
client = new MqttTestClient();
try {
client.connectAndWait(gatewayAccessToken);
client.publishAndWait(DEVICE_TOPIC, DEVICE_PAYLOAD.getBytes());
if (client.isConnected()) {
client.disconnect();
}
} catch (Exception t) {
}
Awaitility.await()
.atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> Mockito.verify(notificationRuleProcessor, Mockito.times(1)).process(gatewayTrigger));
if (client.isConnected()) {
client.disconnect();
}
}
private void createGateway() throws Exception {
Device device = new Device();
device.setName("gateway");
@ -141,7 +207,7 @@ public class MqttGatewayRateLimitsTest extends AbstractControllerTest {
device.setAdditionalInfo(additionalInfo);
device = doPost("/api/device", device, Device.class);
assertNotNull(device);
gatewayId = device.getId();
var gatewayId = device.getId();
assertNotNull(gatewayId);
DeviceCredentials deviceCredentials = doGet("/api/device/" + gatewayId + "/credentials", DeviceCredentials.class);
@ -149,6 +215,8 @@ public class MqttGatewayRateLimitsTest extends AbstractControllerTest {
assertEquals(gatewayId, deviceCredentials.getDeviceId());
gatewayAccessToken = deviceCredentials.getCredentialsId();
assertNotNull(gatewayAccessToken);
this.gateway = device;
}
private Device getDeviceByName(String deviceName) throws Exception {
@ -158,13 +226,13 @@ public class MqttGatewayRateLimitsTest extends AbstractControllerTest {
}
private byte[] getGatewayPayload(String deviceName) {
return String.format("{\"%s\": [{\"values\": {\"temperature\": 42}}]}", deviceName).getBytes();
return String.format("{\"%s\": [{\"values\": %s}]}", deviceName, DEVICE_PAYLOAD).getBytes();
}
private RateLimitsTrigger createRateLimitsTrigger(Device device) {
private RateLimitsTrigger createRateLimitsTrigger(Device device, LimitedApi limitedApi) {
return RateLimitsTrigger.builder()
.tenantId(tenantId)
.api(TRANSPORT_MESSAGES_PER_GATEWAY)
.api(limitedApi)
.limitLevel(device.getId())
.limitLevelEntityName(device.getName())
.build();

View File

@ -41,6 +41,7 @@ public enum LimitedApi {
TRANSPORT_MESSAGES_PER_TENANT("transport messages", true),
TRANSPORT_MESSAGES_PER_DEVICE("transport messages per device", false),
TRANSPORT_MESSAGES_PER_GATEWAY("transport messages per gateway", false),
TRANSPORT_MESSAGES_PER_GATEWAY_DEVICE("transport messages per gateway device", false),
EMAILS("emails sending", true);
private Function<DefaultTenantProfileConfiguration, String> configExtractor;

View File

@ -50,6 +50,9 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
private String transportGatewayMsgRateLimit;
private String transportGatewayTelemetryMsgRateLimit;
private String transportGatewayTelemetryDataPointsRateLimit;
private String transportGatewayDeviceMsgRateLimit;
private String transportGatewayDeviceTelemetryMsgRateLimit;
private String transportGatewayDeviceTelemetryDataPointsRateLimit;
private String tenantEntityExportRateLimit;
private String tenantEntityImportRateLimit;

View File

@ -90,6 +90,8 @@ import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.DataConstants.GATEWAY_PARAMETER;
@Slf4j
public class ProtoUtils {
@ -1013,6 +1015,10 @@ public class ProtoUtils {
.setDeviceProfileIdLSB(device.getDeviceProfileId().getId().getLeastSignificantBits())
.setAdditionalInfo(JacksonUtil.toString(device.getAdditionalInfo()));
if (device.getAdditionalInfo().has(GATEWAY_PARAMETER)) {
builder.setIsGateway(device.getAdditionalInfo().get(GATEWAY_PARAMETER).booleanValue());
}
PowerSavingConfiguration psmConfiguration = switch (device.getDeviceData().getTransportConfiguration().getType()) {
case LWM2M -> (Lwm2mDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration();
case COAP -> (CoapDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration();

View File

@ -98,6 +98,7 @@ message SessionInfoProto {
int64 customerIdLSB = 15;
optional int64 gatewayIdMSB = 16;
optional int64 gatewayIdLSB = 17;
bool isGateway = 18;
}
enum SessionEvent {
@ -184,6 +185,7 @@ message DeviceInfoProto {
int64 edrxCycle = 13;
int64 psmActivityTimer = 14;
int64 pagingTransmissionWindow = 15;
bool isGateway = 16;
}
message DeviceProto {

View File

@ -46,6 +46,7 @@ public class SessionInfoCreator {
.setDeviceType(msg.getDeviceInfo().getDeviceType())
.setDeviceProfileIdMSB(msg.getDeviceInfo().getDeviceProfileId().getId().getMostSignificantBits())
.setDeviceProfileIdLSB(msg.getDeviceInfo().getDeviceProfileId().getId().getLeastSignificantBits())
.setIsGateway(msg.getDeviceInfo().isGateway())
.build();
}

View File

@ -38,4 +38,5 @@ public class TransportDeviceInfo implements Serializable {
private Long edrxCycle;
private Long psmActivityTimer;
private Long pagingTransmissionWindow;
private boolean gateway;
}

View File

@ -41,6 +41,7 @@ import java.util.function.BiConsumer;
import java.util.function.Function;
import static org.thingsboard.server.common.transport.limits.TransportLimitsType.DEVICE_LIMITS;
import static org.thingsboard.server.common.transport.limits.TransportLimitsType.GATEWAY_DEVICE_LIMITS;
import static org.thingsboard.server.common.transport.limits.TransportLimitsType.GATEWAY_LIMITS;
import static org.thingsboard.server.common.transport.limits.TransportLimitsType.TENANT_LIMITS;
@ -53,9 +54,11 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi
private final ConcurrentMap<TenantId, Boolean> tenantAllowed = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantId, Set<DeviceId>> tenantDevices = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantId, Set<DeviceId>> tenantGateways = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantId, Set<DeviceId>> tenantGatewayDevices = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantId, EntityTransportRateLimits> perTenantLimits = new ConcurrentHashMap<>();
private final ConcurrentMap<DeviceId, EntityTransportRateLimits> perDeviceLimits = new ConcurrentHashMap<>();
private final ConcurrentMap<DeviceId, EntityTransportRateLimits> perGatewayLimits = new ConcurrentHashMap<>();
private final ConcurrentMap<DeviceId, EntityTransportRateLimits> perGatewayDeviceLimits = new ConcurrentHashMap<>();
private final Map<InetAddress, InetAddressRateLimitStats> ipMap = new ConcurrentHashMap<>();
private final TransportTenantProfileCache tenantProfileCache;
@ -72,21 +75,23 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi
}
@Override
public TbPair<EntityType, Boolean> checkLimits(TenantId tenantId, DeviceId gatewayId, DeviceId deviceId, int dataPoints) {
public TbPair<EntityType, Boolean> checkLimits(TenantId tenantId, DeviceId gatewayId, DeviceId deviceId, int dataPoints, boolean isGateway) {
if (!tenantAllowed.getOrDefault(tenantId, Boolean.TRUE)) {
return TbPair.of(EntityType.API_USAGE_STATE, false);
}
if (!checkEntityRateLimit(dataPoints, getTenantRateLimits(tenantId))) {
return TbPair.of(EntityType.TENANT, false);
}
if (isGateway && !checkEntityRateLimit(dataPoints, getGatewayDeviceRateLimits(tenantId, deviceId))) {
return TbPair.of(EntityType.DEVICE, true);
}
if (gatewayId != null && !checkEntityRateLimit(dataPoints, getGatewayRateLimits(tenantId, gatewayId))) {
return TbPair.of(EntityType.DEVICE, true);
}
if (deviceId != null && !checkEntityRateLimit(dataPoints, getDeviceRateLimits(tenantId, deviceId))) {
if (!isGateway && deviceId != null && !checkEntityRateLimit(dataPoints, getDeviceRateLimits(tenantId, deviceId))) {
return TbPair.of(EntityType.DEVICE, false);
}
return null;
}
@ -104,8 +109,9 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi
EntityTransportRateLimits tenantRateLimitPrototype = createRateLimits(update.getProfile(), TENANT_LIMITS);
EntityTransportRateLimits deviceRateLimitPrototype = createRateLimits(update.getProfile(), DEVICE_LIMITS);
EntityTransportRateLimits gatewayRateLimitPrototype = createRateLimits(update.getProfile(), GATEWAY_LIMITS);
EntityTransportRateLimits gatewayDeviceRateLimitPrototype = createRateLimits(update.getProfile(), GATEWAY_DEVICE_LIMITS);
for (TenantId tenantId : update.getAffectedTenants()) {
update(tenantId, tenantRateLimitPrototype, deviceRateLimitPrototype, gatewayRateLimitPrototype);
update(tenantId, tenantRateLimitPrototype, deviceRateLimitPrototype, gatewayRateLimitPrototype, gatewayDeviceRateLimitPrototype);
}
}
@ -114,26 +120,34 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi
EntityTransportRateLimits tenantRateLimitPrototype = createRateLimits(tenantProfileCache.get(tenantId), TENANT_LIMITS);
EntityTransportRateLimits deviceRateLimitPrototype = createRateLimits(tenantProfileCache.get(tenantId), DEVICE_LIMITS);
EntityTransportRateLimits gatewayRateLimitPrototype = createRateLimits(tenantProfileCache.get(tenantId), GATEWAY_LIMITS);
update(tenantId, tenantRateLimitPrototype, deviceRateLimitPrototype, gatewayRateLimitPrototype);
EntityTransportRateLimits gatewayDeviceRateLimitPrototype = createRateLimits(tenantProfileCache.get(tenantId), GATEWAY_DEVICE_LIMITS);
update(tenantId, tenantRateLimitPrototype, deviceRateLimitPrototype, gatewayRateLimitPrototype, gatewayDeviceRateLimitPrototype);
}
private void update(TenantId tenantId, EntityTransportRateLimits tenantRateLimitPrototype,
EntityTransportRateLimits deviceRateLimitPrototype, EntityTransportRateLimits gatewayRateLimitPrototype) {
private void update(TenantId tenantId, EntityTransportRateLimits tenantRateLimitPrototype, EntityTransportRateLimits deviceRateLimitPrototype,
EntityTransportRateLimits gatewayRateLimitPrototype, EntityTransportRateLimits gatewayDeviceRateLimitPrototype) {
mergeLimits(tenantId, tenantRateLimitPrototype, perTenantLimits::get, perTenantLimits::put);
getTenantDevices(tenantId).forEach(deviceId -> mergeLimits(deviceId, deviceRateLimitPrototype, perDeviceLimits::get, perDeviceLimits::put));
getTenantGateways(tenantId).forEach(deviceId -> mergeLimits(deviceId, gatewayRateLimitPrototype, perGatewayLimits::get, perGatewayLimits::put));
getTenantGateways(tenantId).forEach(gatewayId -> mergeLimits(gatewayId, gatewayRateLimitPrototype, perGatewayLimits::get, perGatewayLimits::put));
getTenantGatewayDevices(tenantId).forEach(gatewayId -> mergeLimits(gatewayId, gatewayDeviceRateLimitPrototype, perGatewayDeviceLimits::get, perGatewayDeviceLimits::put));
}
@Override
public void remove(TenantId tenantId) {
perTenantLimits.remove(tenantId);
tenantDevices.remove(tenantId);
tenantGateways.remove(tenantId);
tenantGatewayDevices.remove(tenantId);
}
@Override
public void remove(DeviceId deviceId) {
perDeviceLimits.remove(deviceId);
perGatewayLimits.remove(deviceId);
perGatewayDeviceLimits.remove(deviceId);
tenantDevices.values().forEach(set -> set.remove(deviceId));
tenantGateways.values().forEach(set -> set.remove(deviceId));
tenantGatewayDevices.values().forEach(set -> set.remove(deviceId));
}
@Override
@ -273,6 +287,11 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi
telemetryMsgRateLimit = newLimit(profile.getTransportGatewayTelemetryMsgRateLimit());
telemetryDpRateLimit = newLimit(profile.getTransportGatewayTelemetryDataPointsRateLimit());
}
case GATEWAY_DEVICE_LIMITS -> {
regularMsgRateLimit = newLimit(profile.getTransportGatewayDeviceMsgRateLimit());
telemetryMsgRateLimit = newLimit(profile.getTransportGatewayDeviceTelemetryMsgRateLimit());
telemetryDpRateLimit = newLimit(profile.getTransportGatewayDeviceTelemetryDataPointsRateLimit());
}
default -> throw new IllegalStateException("Unknown limits type: " + limitsType);
}
@ -296,10 +315,18 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi
});
}
private EntityTransportRateLimits getGatewayRateLimits(TenantId tenantId, DeviceId deviceId) {
return perGatewayLimits.computeIfAbsent(deviceId, k -> {
private EntityTransportRateLimits getGatewayRateLimits(TenantId tenantId, DeviceId gatewayId) {
return perGatewayLimits.computeIfAbsent(gatewayId, k -> {
EntityTransportRateLimits limits = createRateLimits(tenantProfileCache.get(tenantId), GATEWAY_LIMITS);
getTenantGateways(tenantId).add(deviceId);
getTenantGateways(tenantId).add(gatewayId);
return limits;
});
}
private EntityTransportRateLimits getGatewayDeviceRateLimits(TenantId tenantId, DeviceId gatewayId) {
return perGatewayDeviceLimits.computeIfAbsent(gatewayId, k -> {
EntityTransportRateLimits limits = createRateLimits(tenantProfileCache.get(tenantId), GATEWAY_DEVICE_LIMITS);
getTenantGatewayDevices(tenantId).add(gatewayId);
return limits;
});
}
@ -312,4 +339,8 @@ public class DefaultTransportRateLimitService implements TransportRateLimitServi
return tenantGateways.computeIfAbsent(tenantId, id -> ConcurrentHashMap.newKeySet());
}
private Set<DeviceId> getTenantGatewayDevices(TenantId tenantId) {
return tenantGatewayDevices.computeIfAbsent(tenantId, id -> ConcurrentHashMap.newKeySet());
}
}

View File

@ -16,5 +16,5 @@
package org.thingsboard.server.common.transport.limits;
public enum TransportLimitsType {
TENANT_LIMITS, DEVICE_LIMITS, GATEWAY_LIMITS
TENANT_LIMITS, DEVICE_LIMITS, GATEWAY_LIMITS, GATEWAY_DEVICE_LIMITS
}

View File

@ -25,7 +25,7 @@ import java.net.InetSocketAddress;
public interface TransportRateLimitService {
TbPair<EntityType, Boolean> checkLimits(TenantId tenantId, DeviceId gatewayId, DeviceId deviceId, int dataPoints);
TbPair<EntityType, Boolean> checkLimits(TenantId tenantId, DeviceId gatewayId, DeviceId deviceId, int dataPoints, boolean isGateway);
void update(TenantProfileUpdateResult update);

View File

@ -429,8 +429,8 @@ public class DefaultTransportService extends TransportActivityManager implements
@Override
public void process(TenantId tenantId, TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg requestMsg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse> callback) {
log.trace("Processing msg: {}", requestMsg);
DeviceId gatewayid = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB()));
if (!checkLimits(tenantId, gatewayid, null, requestMsg.getDeviceName(), requestMsg, callback, 0)) {
DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB()));
if (!checkLimits(tenantId, gatewayId, null, requestMsg.getDeviceName(), requestMsg, callback, 0, false)) {
return;
}
@ -476,6 +476,7 @@ public class DefaultTransportService extends TransportActivityManager implements
tdi.setAdditionalInfo(di.getAdditionalInfo());
tdi.setDeviceName(di.getDeviceName());
tdi.setDeviceType(di.getDeviceType());
tdi.setGateway(di.getIsGateway());
if (StringUtils.isNotEmpty(di.getPowerMode())) {
tdi.setPowerMode(PowerMode.valueOf(di.getPowerMode()));
tdi.setEdrxCycle(di.getEdrxCycle());
@ -838,15 +839,15 @@ public class DefaultTransportService extends TransportActivityManager implements
gatewayId = new DeviceId(new UUID(sessionInfo.getGatewayIdMSB(), sessionInfo.getGatewayIdLSB()));
}
return checkLimits(tenantId, gatewayId, deviceId, sessionInfo.getDeviceName(), msg, callback, dataPoints);
return checkLimits(tenantId, gatewayId, deviceId, sessionInfo.getDeviceName(), msg, callback, dataPoints, sessionInfo.getIsGateway());
}
private boolean checkLimits(TenantId tenantId, DeviceId gatewayId, DeviceId deviceId, String deviceName, Object msg, TransportServiceCallback<?> callback, int dataPoints) {
private boolean checkLimits(TenantId tenantId, DeviceId gatewayId, DeviceId deviceId, String deviceName, Object msg, TransportServiceCallback<?> callback, int dataPoints, boolean isGateway) {
if (log.isTraceEnabled()) {
log.trace("[{}][{}] Processing msg: {}", tenantId, deviceName, msg);
}
var rateLimitedPair = rateLimitService.checkLimits(tenantId, gatewayId, deviceId, dataPoints);
var rateLimitedPair = rateLimitService.checkLimits(tenantId, gatewayId, deviceId, dataPoints, isGateway);
if (rateLimitedPair == null) {
return true;
} else {
@ -856,9 +857,15 @@ public class DefaultTransportService extends TransportActivityManager implements
}
if (rateLimitedEntityType == EntityType.DEVICE || rateLimitedEntityType == EntityType.TENANT) {
LimitedApi limitedApi =
rateLimitedEntityType == EntityType.TENANT ? LimitedApi.TRANSPORT_MESSAGES_PER_TENANT :
rateLimitedPair.getSecond() ? LimitedApi.TRANSPORT_MESSAGES_PER_GATEWAY : LimitedApi.TRANSPORT_MESSAGES_PER_DEVICE;
LimitedApi limitedApi;
if (rateLimitedEntityType == EntityType.TENANT) {
limitedApi = LimitedApi.TRANSPORT_MESSAGES_PER_TENANT;
} else if (rateLimitedPair.getSecond()) {
limitedApi = isGateway ? LimitedApi.TRANSPORT_MESSAGES_PER_GATEWAY_DEVICE : LimitedApi.TRANSPORT_MESSAGES_PER_GATEWAY;
} else {
limitedApi = LimitedApi.TRANSPORT_MESSAGES_PER_DEVICE;
}
EntityId limitLevel = rateLimitedEntityType == EntityType.DEVICE ? deviceId == null ? gatewayId : deviceId : tenantId;
@ -1023,16 +1030,20 @@ public class DefaultTransportService extends TransportActivityManager implements
} else {
newDeviceProfile = null;
}
JsonNode deviceAdditionalInfo = device.getAdditionalInfo();
boolean isGateway = deviceAdditionalInfo.has(DataConstants.GATEWAY_PARAMETER)
&& deviceAdditionalInfo.get(DataConstants.GATEWAY_PARAMETER).asBoolean();
TransportProtos.SessionInfoProto newSessionInfo = TransportProtos.SessionInfoProto.newBuilder()
.mergeFrom(md.getSessionInfo())
.setDeviceProfileIdMSB(deviceProfileIdMSB)
.setDeviceProfileIdLSB(deviceProfileIdLSB)
.setDeviceName(device.getName())
.setDeviceType(device.getType()).build();
JsonNode deviceAdditionalInfo = device.getAdditionalInfo();
if (deviceAdditionalInfo.has(DataConstants.GATEWAY_PARAMETER)
&& deviceAdditionalInfo.get(DataConstants.GATEWAY_PARAMETER).asBoolean()
&& deviceAdditionalInfo.has(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER)
.setDeviceType(device.getType())
.setIsGateway(isGateway).build();
if (isGateway && deviceAdditionalInfo.has(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER)
&& deviceAdditionalInfo.get(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER).isBoolean()) {
md.setOverwriteActivityTime(deviceAdditionalInfo.get(DataConstants.OVERWRITE_ACTIVITY_TIME_PARAMETER).asBoolean());
}

View File

@ -520,13 +520,17 @@
<tb-rate-limits fxFlex formControlName="transportGatewayMsgRateLimit"
[type]="rateLimitsType.GATEWAY_MESSAGES">
</tb-rate-limits>
<div fxFlex></div>
<tb-rate-limits fxFlex formControlName="transportGatewayDeviceMsgRateLimit"
[type]="rateLimitsType.GATEWAY_DEVICE_MESSAGES">
</tb-rate-limits>
</div>
<div fxFlex fxLayout="row" fxLayout.xs="column" fxLayoutGap.gt-xs="16px">
<tb-rate-limits fxFlex formControlName="transportGatewayTelemetryMsgRateLimit"
[type]="rateLimitsType.GATEWAY_TELEMETRY_MESSAGES">
</tb-rate-limits>
<div fxFlex></div>
<tb-rate-limits fxFlex formControlName="transportGatewayDeviceTelemetryMsgRateLimit"
[type]="rateLimitsType.GATEWAY_DEVICE_TELEMETRY_MESSAGES">
</tb-rate-limits>
</div>
<mat-expansion-panel class="configuration-panel">
<mat-expansion-panel-header>
@ -547,7 +551,9 @@
<tb-rate-limits fxFlex formControlName="transportGatewayTelemetryDataPointsRateLimit"
[type]="rateLimitsType.GATEWAY_TELEMETRY_DATA_POINTS">
</tb-rate-limits>
<div fxFlex></div>
<tb-rate-limits fxFlex formControlName="transportGatewayDeviceTelemetryDataPointsRateLimit"
[type]="rateLimitsType.GATEWAY_DEVICE_TELEMETRY_DATA_POINTS">
</tb-rate-limits>
</div>
<div fxFlex fxLayout="row" fxLayout.xs="column" fxLayoutGap.gt-xs="16px">
<tb-rate-limits fxFlex formControlName="tenantServerRestLimitsConfiguration"

View File

@ -77,6 +77,9 @@ export class DefaultTenantProfileConfigurationComponent implements ControlValueA
transportGatewayMsgRateLimit: [null, []],
transportGatewayTelemetryMsgRateLimit: [null, []],
transportGatewayTelemetryDataPointsRateLimit: [null, []],
transportGatewayDeviceMsgRateLimit: [null, []],
transportGatewayDeviceTelemetryMsgRateLimit: [null, []],
transportGatewayDeviceTelemetryDataPointsRateLimit: [null, []],
tenantEntityExportRateLimit: [null, []],
tenantEntityImportRateLimit: [null, []],
tenantNotificationRequestsRateLimit: [null, []],

View File

@ -29,6 +29,9 @@ export enum RateLimitsType {
GATEWAY_MESSAGES = 'GATEWAY_MESSAGES',
GATEWAY_TELEMETRY_MESSAGES = 'GATEWAY_TELEMETRY_MESSAGES',
GATEWAY_TELEMETRY_DATA_POINTS = 'GATEWAY_TELEMETRY_DATA_POINTS',
GATEWAY_DEVICE_MESSAGES = 'GATEWAY_DEVICE_MESSAGES',
GATEWAY_DEVICE_TELEMETRY_MESSAGES = 'GATEWAY_DEVICE_TELEMETRY_MESSAGES',
GATEWAY_DEVICE_TELEMETRY_DATA_POINTS = 'GATEWAY_DEVICE_TELEMETRY_DATA_POINTS',
TENANT_TELEMETRY_MESSAGES = 'TENANT_TELEMETRY_MESSAGES',
TENANT_TELEMETRY_DATA_POINTS = 'TENANT_TELEMETRY_DATA_POINTS',
TENANT_SERVER_REST_LIMITS_CONFIGURATION = 'TENANT_SERVER_REST_LIMITS_CONFIGURATION',
@ -56,6 +59,9 @@ export const rateLimitsLabelTranslationMap = new Map<RateLimitsType, string>(
[RateLimitsType.GATEWAY_MESSAGES, 'tenant-profile.rate-limits.transport-gateway-msg'],
[RateLimitsType.GATEWAY_TELEMETRY_MESSAGES, 'tenant-profile.rate-limits.transport-gateway-telemetry-msg'],
[RateLimitsType.GATEWAY_TELEMETRY_DATA_POINTS, 'tenant-profile.rate-limits.transport-gateway-telemetry-data-points'],
[RateLimitsType.GATEWAY_DEVICE_MESSAGES, 'tenant-profile.rate-limits.transport-gateway-device-msg'],
[RateLimitsType.GATEWAY_DEVICE_TELEMETRY_MESSAGES, 'tenant-profile.rate-limits.transport-gateway-device-telemetry-msg'],
[RateLimitsType.GATEWAY_DEVICE_TELEMETRY_DATA_POINTS, 'tenant-profile.rate-limits.transport-gateway-device-telemetry-data-points'],
[RateLimitsType.TENANT_SERVER_REST_LIMITS_CONFIGURATION, 'tenant-profile.rest-requests-for-tenant'],
[RateLimitsType.CUSTOMER_SERVER_REST_LIMITS_CONFIGURATION, 'tenant-profile.customer-rest-limits'],
[RateLimitsType.WS_UPDATE_PER_SESSION_RATE_LIMIT, 'tenant-profile.ws-limit-updates-per-session'],
@ -83,6 +89,9 @@ export const rateLimitsDialogTitleTranslationMap = new Map<RateLimitsType, strin
[RateLimitsType.GATEWAY_MESSAGES, 'tenant-profile.rate-limits.edit-transport-gateway-msg-title'],
[RateLimitsType.GATEWAY_TELEMETRY_MESSAGES, 'tenant-profile.rate-limits.edit-transport-gateway-telemetry-msg-title'],
[RateLimitsType.GATEWAY_TELEMETRY_DATA_POINTS, 'tenant-profile.rate-limits.edit-transport-gateway-telemetry-data-points-title'],
[RateLimitsType.GATEWAY_DEVICE_MESSAGES, 'tenant-profile.rate-limits.edit-transport-gateway-device-msg-title'],
[RateLimitsType.GATEWAY_DEVICE_TELEMETRY_MESSAGES, 'tenant-profile.rate-limits.edit-transport-gateway-device-telemetry-msg-title'],
[RateLimitsType.GATEWAY_DEVICE_TELEMETRY_DATA_POINTS, 'tenant-profile.rate-limits.edit-transport-gateway-device-telemetry-data-points-title'],
[RateLimitsType.CUSTOMER_SERVER_REST_LIMITS_CONFIGURATION, 'tenant-profile.rate-limits.edit-customer-rest-limits-title'],
[RateLimitsType.WS_UPDATE_PER_SESSION_RATE_LIMIT, 'tenant-profile.rate-limits.edit-ws-limit-updates-per-session-title'],
[RateLimitsType.CASSANDRA_QUERY_TENANT_RATE_LIMITS_CONFIGURATION, 'tenant-profile.rate-limits.edit-cassandra-tenant-limits-configuration-title'],

View File

@ -25,6 +25,8 @@ export enum LimitedApi {
CASSANDRA_QUERIES = 'CASSANDRA_QUERIES',
TRANSPORT_MESSAGES_PER_TENANT = 'TRANSPORT_MESSAGES_PER_TENANT',
TRANSPORT_MESSAGES_PER_DEVICE = 'TRANSPORT_MESSAGES_PER_DEVICE',
TRANSPORT_MESSAGES_PER_GATEWAY = 'TRANSPORT_MESSAGES_PER_GATEWAY',
TRANSPORT_MESSAGES_PER_GATEWAY_DEVICE = 'TRANSPORT_MESSAGES_PER_GATEWAY_DEVICE',
EDGE_EVENTS = 'EDGE_EVENTS',
EDGE_EVENTS_PER_EDGE = 'EDGE_EVENTS_PER_EDGE',
EDGE_UPLINK_MESSAGES = 'EDGE_UPLINK_MESSAGES',
@ -43,6 +45,8 @@ export const LimitedApiTranslationMap = new Map<LimitedApi, string>(
[LimitedApi.CASSANDRA_QUERIES, 'api-limit.cassandra-queries'],
[LimitedApi.TRANSPORT_MESSAGES_PER_TENANT, 'api-limit.transport-messages'],
[LimitedApi.TRANSPORT_MESSAGES_PER_DEVICE, 'api-limit.transport-messages-per-device'],
[LimitedApi.TRANSPORT_MESSAGES_PER_GATEWAY, 'api-limit.transport-messages-per-gateway'],
[LimitedApi.TRANSPORT_MESSAGES_PER_GATEWAY_DEVICE, 'api-limit.transport-messages-per-gateway_device'],
[LimitedApi.EDGE_EVENTS, 'api-limit.edge-events'],
[LimitedApi.EDGE_EVENTS_PER_EDGE, 'api-limit.edge-events-per-edge'],
[LimitedApi.EDGE_UPLINK_MESSAGES, 'api-limit.edge-uplink-messages'],

View File

@ -894,6 +894,8 @@
"rest-api-requests-per-customer": "REST API requests per customer",
"transport-messages": "Transport messages",
"transport-messages-per-device": "Transport messages per device",
"transport-messages-per-gateway": "Transport messages per gateway",
"transport-messages-per-gateway-device": "Transport messages per gateway device",
"ws-updates-per-session": "WS updates per session",
"edge-events": "Edge events",
"edge-events-per-edge": "Edge events per edge",
@ -4448,6 +4450,9 @@
"transport-gateway-msg-rate-limit": "Transport gateway messages",
"transport-gateway-telemetry-msg-rate-limit": "Transport gateway telemetry messages",
"transport-gateway-telemetry-data-points-rate-limit": "Transport gateway telemetry data points",
"transport-gateway-device-msg-rate-limit": "Transport gateway device messages",
"transport-gateway-device-telemetry-msg-rate-limit": "Transport gateway device telemetry messages",
"transport-gateway-device-telemetry-data-points-rate-limit": "Transport gateway device telemetry data points",
"tenant-entity-export-rate-limit": "Entity version creation",
"tenant-entity-import-rate-limit": "Entity version load",
"tenant-notification-request-rate-limit": "Notification requests",
@ -4532,6 +4537,9 @@
"edit-transport-gateway-msg-title": "Edit transport gateway messages rate limits",
"edit-transport-gateway-telemetry-msg-title": "Edit transport gateway telemetry messages rate limits",
"edit-transport-gateway-telemetry-data-points-title": "Edit transport gateway telemetry data points rate limits",
"edit-transport-gateway-device-msg-title": "Edit transport gateway device messages rate limits",
"edit-transport-gateway-device-telemetry-msg-title": "Edit transport gateway device telemetry messages rate limits",
"edit-transport-gateway-device-telemetry-data-points-title": "Edit transport gateway device telemetry data points rate limits",
"edit-tenant-rest-limits-title": "Edit REST requests for tenant rate limits",
"edit-customer-rest-limits-title": "Edit REST requests for customer rate limits",
"edit-ws-limit-updates-per-session-title": "Edit WS updates per session rate limits",
@ -4568,6 +4576,9 @@
"transport-gateway-msg": "Transport gateway messages",
"transport-gateway-telemetry-msg": "Transport gateway telemetry messages",
"transport-gateway-telemetry-data-points": "Transport gateway telemetry data points",
"transport-gateway-device-msg": "Transport gateway device messages",
"transport-gateway-device-telemetry-msg": "Transport gateway device telemetry messages",
"transport-gateway-device-telemetry-data-points": "Transport gateway device telemetry data points",
"sec": "sec"
}
},