Api Quota

This commit is contained in:
Vitaliy Paromsiy 2017-11-22 21:46:57 +02:00
parent 99ccb83155
commit 101097f947
23 changed files with 990 additions and 70 deletions

View File

@ -107,6 +107,18 @@ coap:
adaptor: "${COAP_ADAPTOR_NAME:JsonCoapAdaptor}"
timeout: "${COAP_TIMEOUT:10000}"
#Quota parameters
quota:
host:
limit: "${QUOTA_HOST_LIMIT:10000}"
intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}"
ttlMs: "${QUOTA_HOST_TTL_MS:60000}"
cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}"
enabled: "${QUOTA_HOST_ENABLED:false}"
log:
topSize: 10
intervalMin: 2
database:
type: "${DATABASE_TYPE:sql}" # cassandra OR sql

View File

@ -74,6 +74,18 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,45 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.quota;
/**
* @author Vitaliy Paromskiy
* @version 1.0
*/
public final class Clock {
private static long time = 0L;
private Clock() {
}
public static long millis() {
return time == 0 ? System.currentTimeMillis() : time;
}
public static void setMillis(long millis) {
time = millis;
}
public static void shift(long delta) {
time += delta;
}
public static void reset() {
time = 0;
}
}

View File

@ -0,0 +1,38 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.quota;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* @author Vitaliy Paromskiy
* @version 1.0
*/
@Component
public class HostRequestLimitPolicy {
private final long limit;
public HostRequestLimitPolicy(@Value("${quota.host.limit}") long limit) {
this.limit = limit;
}
public boolean isValid(long currentValue) {
return currentValue <= limit;
}
}

View File

@ -0,0 +1,76 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.quota;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.quota.inmemory.HostRequestIntervalRegistry;
import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner;
import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
* @author Vitaliy Paromskiy
* @version 1.0
*/
@Service
@Slf4j
public class HostRequestsQuotaService implements QuotaService {
private final HostRequestIntervalRegistry requestRegistry;
private final HostRequestLimitPolicy requestsPolicy;
private final IntervalRegistryCleaner registryCleaner;
private final IntervalRegistryLogger registryLogger;
private final boolean enabled;
public HostRequestsQuotaService(HostRequestIntervalRegistry requestRegistry, HostRequestLimitPolicy requestsPolicy,
IntervalRegistryCleaner registryCleaner, IntervalRegistryLogger registryLogger,
@Value("${quota.host.enabled}") boolean enabled) {
this.requestRegistry = requestRegistry;
this.requestsPolicy = requestsPolicy;
this.registryCleaner = registryCleaner;
this.registryLogger = registryLogger;
this.enabled = enabled;
}
@PostConstruct
public void init() {
if (enabled) {
registryCleaner.schedule();
registryLogger.schedule();
}
}
@PreDestroy
public void close() {
if (enabled) {
registryCleaner.stop();
registryLogger.stop();
}
}
@Override
public boolean isQuotaExceeded(String key) {
if (enabled) {
long count = requestRegistry.tick(key);
return requestsPolicy.isValid(count);
}
return false;
}
}

View File

@ -0,0 +1,25 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.quota;
/**
* @author Vitaliy Paromskiy
* @version 1.0
*/
public interface QuotaService {
boolean isQuotaExceeded(String key);
}

View File

@ -0,0 +1,67 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.quota.inmemory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* @author Vitaliy Paromskiy
* @version 1.0
*/
@Component
@Slf4j
public class HostRequestIntervalRegistry {
private final Map<String, IntervalCount> hostCounts = new ConcurrentHashMap<>();
private final long intervalDurationMs;
private final long ttlMs;
public HostRequestIntervalRegistry(@Value("${quota.host.intervalMs}") long intervalDurationMs,
@Value("${quota.host.ttlMs}") long ttlMs) {
this.intervalDurationMs = intervalDurationMs;
this.ttlMs = ttlMs;
}
@PostConstruct
public void init() {
if (ttlMs < intervalDurationMs) {
log.warn("TTL for IntervalRegistry [{}] smaller than interval duration [{}]", ttlMs, intervalDurationMs);
}
}
public long tick(String clientHostId) {
IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs));
return intervalCount.resetIfExpiredAndTick();
}
public void clean() {
hostCounts.entrySet().removeIf(entry -> entry.getValue().silenceDuration() > ttlMs);
}
public Map<String, Long> getContent() {
return hostCounts.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
interval -> interval.getValue().getCount()));
}
}

View File

@ -0,0 +1,68 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.quota.inmemory;
import org.thingsboard.server.common.transport.quota.Clock;
import java.util.concurrent.atomic.LongAdder;
/**
* @author Vitaliy Paromskiy
* @version 1.0
*/
public class IntervalCount {
private final LongAdder adder = new LongAdder();
private final long intervalDurationMs;
private volatile long startTime;
private volatile long lastTickTime;
public IntervalCount(long intervalDurationMs) {
this.intervalDurationMs = intervalDurationMs;
startTime = Clock.millis();
}
public long resetIfExpiredAndTick() {
if (isExpired()) {
reset();
}
tick();
return adder.sum();
}
public long silenceDuration() {
return Clock.millis() - lastTickTime;
}
public long getCount() {
return adder.sum();
}
private void tick() {
adder.add(1);
lastTickTime = Clock.millis();
}
private void reset() {
adder.reset();
startTime = Clock.millis();
}
private boolean isExpired() {
return (Clock.millis() - startTime) > intervalDurationMs;
}
}

View File

@ -0,0 +1,66 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.quota.inmemory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author Vitaliy Paromskiy
* @version 1.0
*/
@Component
@Slf4j
public class IntervalRegistryCleaner {
private final HostRequestIntervalRegistry intervalRegistry;
private final long cleanPeriodMs;
private ScheduledExecutorService executor;
public IntervalRegistryCleaner(HostRequestIntervalRegistry intervalRegistry, @Value("${quota.host.cleanPeriodMs}") long cleanPeriodMs) {
this.intervalRegistry = intervalRegistry;
this.cleanPeriodMs = cleanPeriodMs;
}
public void schedule() {
if (executor != null) {
throw new IllegalStateException("Registry Cleaner already scheduled");
}
executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(this::clean, cleanPeriodMs, cleanPeriodMs, TimeUnit.MILLISECONDS);
}
public void stop() {
if (executor != null) {
executor.shutdown();
}
}
public void clean() {
try {
intervalRegistry.clean();
} catch (RuntimeException ex) {
log.error("Could not clear Interval Registry", ex);
}
}
}

View File

@ -0,0 +1,88 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.quota.inmemory;
import com.google.common.collect.MinMaxPriorityQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* @author Vitaliy Paromskiy
* @version 1.0
*/
@Component
@Slf4j
public class IntervalRegistryLogger {
private final int topSize;
private final HostRequestIntervalRegistry intervalRegistry;
private final long logIntervalMin;
private ScheduledExecutorService executor;
public IntervalRegistryLogger(@Value("${quota.log.topSize}") int topSize, @Value("${quota.log.intervalMin}") long logIntervalMin,
HostRequestIntervalRegistry intervalRegistry) {
this.topSize = topSize;
this.logIntervalMin = logIntervalMin;
this.intervalRegistry = intervalRegistry;
}
public void schedule() {
if (executor != null) {
throw new IllegalStateException("Registry Cleaner already scheduled");
}
executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(this::logStatistic, logIntervalMin, logIntervalMin, TimeUnit.MINUTES);
}
public void stop() {
if (executor != null) {
executor.shutdown();
}
}
public void logStatistic() {
Map<String, Long> top = getTopElements(intervalRegistry.getContent());
log(top);
}
protected Map<String, Long> getTopElements(Map<String, Long> countMap) {
MinMaxPriorityQueue<Map.Entry<String, Long>> topQueue = MinMaxPriorityQueue
.orderedBy(Comparator.comparing((Function<Map.Entry<String, Long>, Long>) Map.Entry::getValue).reversed())
.maximumSize(topSize)
.create(countMap.entrySet());
return topQueue.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private void log(Map<String, Long> top) {
StringBuilder builder = new StringBuilder("Quota Statistic : ");
for (Map.Entry<String, Long> host : top.entrySet()) {
builder.append(host.getKey()).append(" : ").append(host.getValue());
}
log.info(builder.toString());
}
}

View File

@ -0,0 +1,66 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.quota;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* @author Vitaliy Paromskiy
* @version 1.0
*/
public class ClockTest {
@Before
public void init() {
Clock.reset();
}
@After
public void clear() {
Clock.reset();
}
@Test
public void defaultClockUseSystemTime() {
assertFalse(Clock.millis() > System.currentTimeMillis());
}
@Test
public void timeCanBeSet() {
Clock.setMillis(100L);
assertEquals(100L, Clock.millis());
}
@Test
public void clockCanBeReseted() {
Clock.setMillis(100L);
assertEquals(100L, Clock.millis());
Clock.reset();
assertFalse(Clock.millis() > System.currentTimeMillis());
}
@Test
public void timeIsShifted() {
Clock.setMillis(100L);
Clock.shift(50L);
assertEquals(150L, Clock.millis());
}
}

View File

@ -0,0 +1,46 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.quota;
import org.junit.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* @author Vitaliy Paromskiy
* @version 1.0
*/
public class HostRequestLimitPolicyTest {
private HostRequestLimitPolicy limitPolicy = new HostRequestLimitPolicy(10L);
@Test
public void ifCurrentValueLessThenLimitItIsValid() {
assertTrue(limitPolicy.isValid(9));
}
@Test
public void ifCurrentValueEqualsToLimitItIsValid() {
assertTrue(limitPolicy.isValid(10));
}
@Test
public void ifCurrentValueGreaterThenLimitItIsValid() {
assertFalse(limitPolicy.isValid(11));
}
}

View File

@ -0,0 +1,64 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.quota;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.transport.quota.inmemory.HostRequestIntervalRegistry;
import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner;
import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
/**
* @author Vitaliy Paromskiy
* @version 1.0
*/
public class HostRequestsQuotaServiceTest {
private HostRequestsQuotaService quotaService;
private HostRequestIntervalRegistry requestRegistry = mock(HostRequestIntervalRegistry.class);
private HostRequestLimitPolicy requestsPolicy = mock(HostRequestLimitPolicy.class);
private IntervalRegistryCleaner registryCleaner = mock(IntervalRegistryCleaner.class);
private IntervalRegistryLogger registryLogger = mock(IntervalRegistryLogger.class);
@Before
public void init() {
quotaService = new HostRequestsQuotaService(requestRegistry, requestsPolicy, registryCleaner, registryLogger, true);
}
@Test
public void hostQuotaValidated() {
when(requestRegistry.tick("key")).thenReturn(10L);
when(requestsPolicy.isValid(10L)).thenReturn(true);
assertTrue(quotaService.isQuotaExceeded("key"));
verify(requestRegistry).tick("key");
verify(requestsPolicy).isValid(10L);
verifyNoMoreInteractions(requestRegistry, requestsPolicy);
}
@Test
public void serviceCanBeDisabled() {
quotaService = new HostRequestsQuotaService(requestRegistry, requestsPolicy, registryCleaner, registryLogger, false);
assertFalse(quotaService.isQuotaExceeded("key"));
verifyNoMoreInteractions(requestRegistry, requestsPolicy);
}
}

View File

@ -0,0 +1,57 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.quota.inmemory;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
/**
* @author Vitaliy Paromskiy
* @version 1.0
*/
public class HostRequestIntervalRegistryTest {
private HostRequestIntervalRegistry registry;
@Before
public void init() {
registry = new HostRequestIntervalRegistry(10000L, 100L);
}
@Test
public void newHostCreateNewInterval() {
assertEquals(1L, registry.tick("host1"));
}
@Test
public void existingHostUpdated() {
registry.tick("aaa");
assertEquals(1L, registry.tick("bbb"));
assertEquals(2L, registry.tick("aaa"));
}
@Test
public void expiredIntervalsCleaned() throws InterruptedException {
registry.tick("aaa");
Thread.sleep(150L);
registry.tick("bbb");
registry.clean();
assertEquals(1L, registry.tick("aaa"));
assertEquals(2L, registry.tick("bbb"));
}
}

View File

@ -0,0 +1,65 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.quota.inmemory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.transport.quota.Clock;
import static org.junit.Assert.assertEquals;
/**
* @author Vitaliy Paromskiy
* @version 1.0
*/
public class IntervalCountTest {
@Before
public void init() {
Clock.setMillis(1000L);
}
@After
public void clear() {
Clock.reset();
}
@Test
public void ticksInSameIntervalAreSummed() {
IntervalCount intervalCount = new IntervalCount(100L);
assertEquals(1L, intervalCount.resetIfExpiredAndTick());
Clock.shift(100);
assertEquals(2L, intervalCount.resetIfExpiredAndTick());
}
@Test
public void oldDataCleanedWhenIntervalExpired() {
IntervalCount intervalCount = new IntervalCount(100L);
assertEquals(1L, intervalCount.resetIfExpiredAndTick());
Clock.shift(101);
assertEquals(1L, intervalCount.resetIfExpiredAndTick());
}
@Test
public void silenceDurationCalculatedFromLastTick() {
IntervalCount intervalCount = new IntervalCount(100L);
assertEquals(1L, intervalCount.resetIfExpiredAndTick());
Clock.shift(10L);
assertEquals(10L, intervalCount.silenceDuration());
}
}

View File

@ -0,0 +1,61 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.quota.inmemory;
import com.google.common.collect.ImmutableMap;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
/**
* @author Vitaliy Paromskiy
* @version 1.0
*/
public class IntervalRegistryLoggerTest {
private IntervalRegistryLogger logger;
private HostRequestIntervalRegistry requestRegistry = mock(HostRequestIntervalRegistry.class);
@Before
public void init() {
logger = new IntervalRegistryLogger(3, 10, requestRegistry);
}
@Test
public void onlyMaxHostsCollected() {
Map<String, Long> map = ImmutableMap.of("a", 8L, "b", 3L, "c", 1L, "d", 3L);
Map<String, Long> actual = logger.getTopElements(map);
Map<String, Long> expected = ImmutableMap.of("a", 8L, "b", 3L, "d", 3L);
assertEquals(expected, actual);
}
@Test
public void emptyMapProcessedCorrectly() {
Map<String, Long> map = Collections.emptyMap();
Map<String, Long> actual = logger.getTopElements(map);
Map<String, Long> expected = Collections.emptyMap();
assertEquals(expected, actual);
}
}

View File

@ -34,6 +34,7 @@ import org.thingsboard.server.common.msg.session.*;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
import org.thingsboard.server.transport.coap.session.CoapExchangeObserverProxy;
import org.thingsboard.server.transport.coap.session.CoapSessionCtx;
@ -51,13 +52,16 @@ public class CoapTransportResource extends CoapResource {
private final CoapTransportAdaptor adaptor;
private final SessionMsgProcessor processor;
private final DeviceAuthService authService;
private final QuotaService quotaService;
private final Field observerField;
private final long timeout;
public CoapTransportResource(SessionMsgProcessor processor, DeviceAuthService authService, CoapTransportAdaptor adaptor, String name, long timeout) {
public CoapTransportResource(SessionMsgProcessor processor, DeviceAuthService authService, CoapTransportAdaptor adaptor, String name,
long timeout, QuotaService quotaService) {
super(name);
this.processor = processor;
this.authService = authService;
this.quotaService = quotaService;
this.adaptor = adaptor;
this.timeout = timeout;
// This is important to turn off existing observable logic in
@ -70,6 +74,13 @@ public class CoapTransportResource extends CoapResource {
@Override
public void handleGET(CoapExchange exchange) {
if(quotaService.isQuotaExceeded(exchange.getSourceAddress().getHostAddress())) {
log.warn("Missing feature type parameter");
log.warn("COAP Quota exceeded for [{}:{}] . Disconnect", exchange.getSourceAddress().getHostAddress(), exchange.getSourcePort());
exchange.respond(ResponseCode.BAD_REQUEST);
return;
}
Optional<FeatureType> featureType = getFeatureType(exchange.advanced().getRequest());
if (!featureType.isPresent()) {
log.trace("Missing feature type parameter");

View File

@ -15,26 +15,24 @@
*/
package org.thingsboard.server.transport.coap;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.CoapResource;
import org.eclipse.californium.core.CoapServer;
import org.eclipse.californium.core.network.CoapEndpoint;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@Service("CoapTransportService")
@Slf4j
@ -54,6 +52,9 @@ public class CoapTransportService {
@Autowired(required = false)
private DeviceAuthService authService;
@Autowired(required = false)
private QuotaService quotaService;
@Value("${coap.bind_address}")
private String host;
@ -83,7 +84,7 @@ public class CoapTransportService {
private void createResources() {
CoapResource api = new CoapResource(API);
api.add(new CoapTransportResource(processor, authService, adaptor, V1, timeout));
api.add(new CoapTransportResource(processor, authService, adaptor, V1, timeout, quotaService));
server.add(api);
}

View File

@ -50,6 +50,7 @@ import org.thingsboard.server.common.msg.session.*;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthResult;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
import java.util.ArrayList;
import java.util.List;
@ -131,6 +132,11 @@ public class CoapServerTest {
}
};
}
@Bean
public static QuotaService quotaService() {
return key -> false;
}
}
@Autowired

View File

@ -35,10 +35,11 @@ import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.transport.http.session.HttpSessionCtx;
import javax.servlet.http.HttpServletRequest;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@ -59,11 +60,18 @@ public class DeviceApiController {
@Autowired(required = false)
private DeviceAuthService authService;
@Autowired(required = false)
private QuotaService quotaService;
@RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json")
public DeferredResult<ResponseEntity> getDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
@RequestParam(value = "clientKeys", required = false, defaultValue = "") String clientKeys,
@RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys) {
@RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys,
HttpServletRequest httpRequest) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
if (quotaExceeded(httpRequest, responseWriter)) {
return responseWriter;
}
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
GetAttributesRequest request;
@ -84,8 +92,11 @@ public class DeviceApiController {
@RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.POST)
public DeferredResult<ResponseEntity> postDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
@RequestBody String json) {
@RequestBody String json, HttpServletRequest request) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
if (quotaExceeded(request, responseWriter)) {
return responseWriter;
}
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
@ -101,8 +112,11 @@ public class DeviceApiController {
@RequestMapping(value = "/{deviceToken}/telemetry", method = RequestMethod.POST)
public DeferredResult<ResponseEntity> postTelemetry(@PathVariable("deviceToken") String deviceToken,
@RequestBody String json) {
@RequestBody String json, HttpServletRequest request) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
if (quotaExceeded(request, responseWriter)) {
return responseWriter;
}
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
@ -118,15 +132,20 @@ public class DeviceApiController {
@RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.GET, produces = "application/json")
public DeferredResult<ResponseEntity> subscribeToCommands(@PathVariable("deviceToken") String deviceToken,
@RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout) {
return subscribe(deviceToken, timeout, new RpcSubscribeMsg());
@RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
HttpServletRequest request) {
return subscribe(deviceToken, timeout, new RpcSubscribeMsg(), request);
}
@RequestMapping(value = "/{deviceToken}/rpc/{requestId}", method = RequestMethod.POST)
public DeferredResult<ResponseEntity> replyToCommand(@PathVariable("deviceToken") String deviceToken,
@PathVariable("requestId") Integer requestId,
@RequestBody String json) {
@RequestBody String json, HttpServletRequest request) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
if (quotaExceeded(request, responseWriter)) {
return responseWriter;
}
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
@ -143,8 +162,11 @@ public class DeviceApiController {
@RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.POST)
public DeferredResult<ResponseEntity> postRpcRequest(@PathVariable("deviceToken") String deviceToken,
@RequestBody String json) {
@RequestBody String json, HttpServletRequest httpRequest) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
if (quotaExceeded(httpRequest, responseWriter)) {
return responseWriter;
}
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
@ -163,12 +185,17 @@ public class DeviceApiController {
@RequestMapping(value = "/{deviceToken}/attributes/updates", method = RequestMethod.GET, produces = "application/json")
public DeferredResult<ResponseEntity> subscribeToAttributes(@PathVariable("deviceToken") String deviceToken,
@RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout) {
return subscribe(deviceToken, timeout, new AttributesSubscribeMsg());
@RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
HttpServletRequest httpRequest) {
return subscribe(deviceToken, timeout, new AttributesSubscribeMsg(), httpRequest);
}
private DeferredResult<ResponseEntity> subscribe(String deviceToken, long timeout, FromDeviceMsg msg) {
private DeferredResult<ResponseEntity> subscribe(String deviceToken, long timeout, FromDeviceMsg msg, HttpServletRequest httpRequest) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
if (quotaExceeded(httpRequest, responseWriter)) {
return responseWriter;
}
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
@ -195,4 +222,13 @@ public class DeviceApiController {
processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg));
}
private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {
if (quotaService.isQuotaExceeded(request.getRemoteAddr())) {
log.warn("REST Quota exceeded for [{}] . Disconnect", request.getRemoteAddr());
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BANDWIDTH_LIMIT_EXCEEDED));
return true;
}
return false;
}
}

View File

@ -16,7 +16,6 @@
package org.thingsboard.server.transport.mqtt;
import com.fasterxml.jackson.databind.JsonNode;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.*;
@ -36,18 +35,18 @@ import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.dao.EncryptionUtil;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
import org.thingsboard.server.transport.mqtt.util.SslUtil;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.cert.X509Certificate;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
@ -72,13 +71,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private final DeviceService deviceService;
private final DeviceAuthService authService;
private final RelationService relationService;
private final QuotaService quotaService;
private final SslHandler sslHandler;
private volatile boolean connected;
private volatile InetSocketAddress address;
private volatile GatewaySessionCtx gatewaySessionCtx;
public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
MqttTransportAdaptor adaptor, SslHandler sslHandler) {
MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) {
this.processor = processor;
this.deviceService = deviceService;
this.relationService = relationService;
@ -87,6 +87,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor);
this.sessionId = deviceSessionCtx.getSessionId().toUidStr();
this.sslHandler = sslHandler;
this.quotaService = quotaService;
}
@Override
@ -102,35 +103,43 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (msg.fixedHeader() == null) {
log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
processDisconnect(ctx);
} else {
deviceSessionCtx.setChannel(ctx);
switch (msg.fixedHeader().messageType()) {
case CONNECT:
processConnect(ctx, (MqttConnectMessage) msg);
break;
case PUBLISH:
processPublish(ctx, (MqttPublishMessage) msg);
break;
case SUBSCRIBE:
processSubscribe(ctx, (MqttSubscribeMessage) msg);
break;
case UNSUBSCRIBE:
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
case PINGREQ:
if (checkConnected(ctx)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
}
break;
case DISCONNECT:
if (checkConnected(ctx)) {
processDisconnect(ctx);
}
break;
default:
break;
}
return;
}
if (quotaService.isQuotaExceeded(address.getHostName())) {
log.warn("MQTT Quota exceeded for [{}:{}] . Disconnect", address.getHostName(), address.getPort());
processDisconnect(ctx);
return;
}
deviceSessionCtx.setChannel(ctx);
switch (msg.fixedHeader().messageType()) {
case CONNECT:
processConnect(ctx, (MqttConnectMessage) msg);
break;
case PUBLISH:
processPublish(ctx, (MqttPublishMessage) msg);
break;
case SUBSCRIBE:
processSubscribe(ctx, (MqttSubscribeMessage) msg);
break;
case UNSUBSCRIBE:
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
case PINGREQ:
if (checkConnected(ctx)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
}
break;
case DISCONNECT:
if (checkConnected(ctx)) {
processDisconnect(ctx);
}
break;
default:
break;
}
}
private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {

View File

@ -15,27 +15,19 @@
*/
package org.thingsboard.server.transport.mqtt;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import javax.net.ssl.SSLException;
import java.security.cert.CertificateException;
/**
* @author Andrew Shvayka
*/
@ -49,16 +41,18 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
private final RelationService relationService;
private final MqttTransportAdaptor adaptor;
private final MqttSslHandlerProvider sslHandlerProvider;
private final QuotaService quotaService;
public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
MqttTransportAdaptor adaptor,
MqttSslHandlerProvider sslHandlerProvider) {
MqttTransportAdaptor adaptor, MqttSslHandlerProvider sslHandlerProvider,
QuotaService quotaService) {
this.processor = processor;
this.deviceService = deviceService;
this.authService = authService;
this.relationService = relationService;
this.adaptor = adaptor;
this.sslHandlerProvider = sslHandlerProvider;
this.quotaService = quotaService;
}
@Override
@ -72,7 +66,9 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
pipeline.addLast("decoder", new MqttDecoder(MAX_PAYLOAD_SIZE));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService, adaptor, sslHandler);
MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService,
adaptor, sslHandler, quotaService);
pipeline.addLast(handler);
ch.closeFuture().addListener(handler);
}

View File

@ -28,6 +28,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
@ -63,6 +64,9 @@ public class MqttTransportService {
@Autowired(required = false)
private MqttSslHandlerProvider sslHandlerProvider;
@Autowired(required = false)
private QuotaService quotaService;
@Value("${mqtt.bind_address}")
private String host;
@Value("${mqtt.bind_port}")
@ -99,7 +103,8 @@ public class MqttTransportService {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService, adaptor, sslHandlerProvider));
.childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService,
adaptor, sslHandlerProvider, quotaService));
serverChannel = b.bind(host, port).sync().channel();
log.info("Mqtt transport started!");