From 101097f9471879eea2d86c2e3fcba3fa94620bd7 Mon Sep 17 00:00:00 2001 From: Vitaliy Paromsiy Date: Wed, 22 Nov 2017 21:46:57 +0200 Subject: [PATCH] Api Quota --- .../src/main/resources/thingsboard.yml | 12 +++ common/transport/pom.xml | 12 +++ .../server/common/transport/quota/Clock.java | 45 ++++++++++ .../quota/HostRequestLimitPolicy.java | 38 ++++++++ .../quota/HostRequestsQuotaService.java | 76 ++++++++++++++++ .../common/transport/quota/QuotaService.java | 25 ++++++ .../inmemory/HostRequestIntervalRegistry.java | 67 ++++++++++++++ .../quota/inmemory/IntervalCount.java | 68 ++++++++++++++ .../inmemory/IntervalRegistryCleaner.java | 66 ++++++++++++++ .../inmemory/IntervalRegistryLogger.java | 88 +++++++++++++++++++ .../common/transport/quota/ClockTest.java | 66 ++++++++++++++ .../quota/HostRequestLimitPolicyTest.java | 46 ++++++++++ .../quota/HostRequestsQuotaServiceTest.java | 64 ++++++++++++++ .../HostRequestIntervalRegistryTest.java | 57 ++++++++++++ .../quota/inmemory/IntervalCountTest.java | 65 ++++++++++++++ .../inmemory/IntervalRegistryLoggerTest.java | 61 +++++++++++++ .../transport/coap/CoapTransportResource.java | 13 ++- .../transport/coap/CoapTransportService.java | 27 +++--- .../server/transport/coap/CoapServerTest.java | 6 ++ .../transport/http/DeviceApiController.java | 58 +++++++++--- .../transport/mqtt/MqttTransportHandler.java | 73 ++++++++------- .../mqtt/MqttTransportServerInitializer.java | 20 ++--- .../transport/mqtt/MqttTransportService.java | 7 +- 23 files changed, 990 insertions(+), 70 deletions(-) create mode 100644 common/transport/src/main/java/org/thingsboard/server/common/transport/quota/Clock.java create mode 100644 common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicy.java create mode 100644 common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaService.java create mode 100644 common/transport/src/main/java/org/thingsboard/server/common/transport/quota/QuotaService.java create mode 100644 common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java create mode 100644 common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCount.java create mode 100644 common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryCleaner.java create mode 100644 common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLogger.java create mode 100644 common/transport/src/test/java/org/thingsboard/server/common/transport/quota/ClockTest.java create mode 100644 common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicyTest.java create mode 100644 common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java create mode 100644 common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistryTest.java create mode 100644 common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCountTest.java create mode 100644 common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLoggerTest.java diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index e1bad01cce..657e28e173 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -107,6 +107,18 @@ coap: adaptor: "${COAP_ADAPTOR_NAME:JsonCoapAdaptor}" timeout: "${COAP_TIMEOUT:10000}" +#Quota parameters +quota: + host: + limit: "${QUOTA_HOST_LIMIT:10000}" + intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}" + ttlMs: "${QUOTA_HOST_TTL_MS:60000}" + cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}" + enabled: "${QUOTA_HOST_ENABLED:false}" + log: + topSize: 10 + intervalMin: 2 + database: type: "${DATABASE_TYPE:sql}" # cassandra OR sql diff --git a/common/transport/pom.xml b/common/transport/pom.xml index f609e9a2c8..3243131e1c 100644 --- a/common/transport/pom.xml +++ b/common/transport/pom.xml @@ -74,6 +74,18 @@ mockito-all test + + org.springframework + spring-context + + + com.google.guava + guava + + + org.apache.commons + commons-lang3 + diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/Clock.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/Clock.java new file mode 100644 index 0000000000..e832354b1e --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/Clock.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public final class Clock { + + private static long time = 0L; + + private Clock() { + } + + + public static long millis() { + return time == 0 ? System.currentTimeMillis() : time; + } + + public static void setMillis(long millis) { + time = millis; + } + + public static void shift(long delta) { + time += delta; + } + + public static void reset() { + time = 0; + } +} diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicy.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicy.java new file mode 100644 index 0000000000..83d664123e --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicy.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +@Component +public class HostRequestLimitPolicy { + + private final long limit; + + public HostRequestLimitPolicy(@Value("${quota.host.limit}") long limit) { + this.limit = limit; + } + + public boolean isValid(long currentValue) { + return currentValue <= limit; + } + +} diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaService.java new file mode 100644 index 0000000000..7ef4df258f --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaService.java @@ -0,0 +1,76 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.transport.quota.inmemory.HostRequestIntervalRegistry; +import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner; +import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +@Service +@Slf4j +public class HostRequestsQuotaService implements QuotaService { + + private final HostRequestIntervalRegistry requestRegistry; + private final HostRequestLimitPolicy requestsPolicy; + private final IntervalRegistryCleaner registryCleaner; + private final IntervalRegistryLogger registryLogger; + private final boolean enabled; + + public HostRequestsQuotaService(HostRequestIntervalRegistry requestRegistry, HostRequestLimitPolicy requestsPolicy, + IntervalRegistryCleaner registryCleaner, IntervalRegistryLogger registryLogger, + @Value("${quota.host.enabled}") boolean enabled) { + this.requestRegistry = requestRegistry; + this.requestsPolicy = requestsPolicy; + this.registryCleaner = registryCleaner; + this.registryLogger = registryLogger; + this.enabled = enabled; + } + + @PostConstruct + public void init() { + if (enabled) { + registryCleaner.schedule(); + registryLogger.schedule(); + } + } + + @PreDestroy + public void close() { + if (enabled) { + registryCleaner.stop(); + registryLogger.stop(); + } + } + + @Override + public boolean isQuotaExceeded(String key) { + if (enabled) { + long count = requestRegistry.tick(key); + return requestsPolicy.isValid(count); + } + return false; + } +} diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/QuotaService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/QuotaService.java new file mode 100644 index 0000000000..cea5db63d1 --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/QuotaService.java @@ -0,0 +1,25 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public interface QuotaService { + + boolean isQuotaExceeded(String key); +} diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java new file mode 100644 index 0000000000..0f5da2b1a8 --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java @@ -0,0 +1,67 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota.inmemory; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +@Component +@Slf4j +public class HostRequestIntervalRegistry { + + private final Map hostCounts = new ConcurrentHashMap<>(); + private final long intervalDurationMs; + private final long ttlMs; + + public HostRequestIntervalRegistry(@Value("${quota.host.intervalMs}") long intervalDurationMs, + @Value("${quota.host.ttlMs}") long ttlMs) { + this.intervalDurationMs = intervalDurationMs; + this.ttlMs = ttlMs; + } + + @PostConstruct + public void init() { + if (ttlMs < intervalDurationMs) { + log.warn("TTL for IntervalRegistry [{}] smaller than interval duration [{}]", ttlMs, intervalDurationMs); + } + } + + public long tick(String clientHostId) { + IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs)); + return intervalCount.resetIfExpiredAndTick(); + } + + public void clean() { + hostCounts.entrySet().removeIf(entry -> entry.getValue().silenceDuration() > ttlMs); + } + + public Map getContent() { + return hostCounts.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + interval -> interval.getValue().getCount())); + } +} diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCount.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCount.java new file mode 100644 index 0000000000..8301b8e6f1 --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCount.java @@ -0,0 +1,68 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota.inmemory; + + +import org.thingsboard.server.common.transport.quota.Clock; + +import java.util.concurrent.atomic.LongAdder; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public class IntervalCount { + + private final LongAdder adder = new LongAdder(); + private final long intervalDurationMs; + private volatile long startTime; + private volatile long lastTickTime; + + public IntervalCount(long intervalDurationMs) { + this.intervalDurationMs = intervalDurationMs; + startTime = Clock.millis(); + } + + public long resetIfExpiredAndTick() { + if (isExpired()) { + reset(); + } + tick(); + return adder.sum(); + } + + public long silenceDuration() { + return Clock.millis() - lastTickTime; + } + + public long getCount() { + return adder.sum(); + } + + private void tick() { + adder.add(1); + lastTickTime = Clock.millis(); + } + + private void reset() { + adder.reset(); + startTime = Clock.millis(); + } + + private boolean isExpired() { + return (Clock.millis() - startTime) > intervalDurationMs; + } +} diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryCleaner.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryCleaner.java new file mode 100644 index 0000000000..1e2076a0b0 --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryCleaner.java @@ -0,0 +1,66 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota.inmemory; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +@Component +@Slf4j +public class IntervalRegistryCleaner { + + private final HostRequestIntervalRegistry intervalRegistry; + private final long cleanPeriodMs; + private ScheduledExecutorService executor; + + public IntervalRegistryCleaner(HostRequestIntervalRegistry intervalRegistry, @Value("${quota.host.cleanPeriodMs}") long cleanPeriodMs) { + this.intervalRegistry = intervalRegistry; + this.cleanPeriodMs = cleanPeriodMs; + } + + public void schedule() { + if (executor != null) { + throw new IllegalStateException("Registry Cleaner already scheduled"); + } + executor = Executors.newSingleThreadScheduledExecutor(); + executor.scheduleAtFixedRate(this::clean, cleanPeriodMs, cleanPeriodMs, TimeUnit.MILLISECONDS); + } + + public void stop() { + if (executor != null) { + executor.shutdown(); + } + } + + public void clean() { + try { + intervalRegistry.clean(); + } catch (RuntimeException ex) { + log.error("Could not clear Interval Registry", ex); + } + } + +} diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLogger.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLogger.java new file mode 100644 index 0000000000..91797d4422 --- /dev/null +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLogger.java @@ -0,0 +1,88 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota.inmemory; + +import com.google.common.collect.MinMaxPriorityQueue; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.Comparator; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +@Component +@Slf4j +public class IntervalRegistryLogger { + + private final int topSize; + private final HostRequestIntervalRegistry intervalRegistry; + private final long logIntervalMin; + private ScheduledExecutorService executor; + + public IntervalRegistryLogger(@Value("${quota.log.topSize}") int topSize, @Value("${quota.log.intervalMin}") long logIntervalMin, + HostRequestIntervalRegistry intervalRegistry) { + this.topSize = topSize; + this.logIntervalMin = logIntervalMin; + this.intervalRegistry = intervalRegistry; + } + + public void schedule() { + if (executor != null) { + throw new IllegalStateException("Registry Cleaner already scheduled"); + } + executor = Executors.newSingleThreadScheduledExecutor(); + executor.scheduleAtFixedRate(this::logStatistic, logIntervalMin, logIntervalMin, TimeUnit.MINUTES); + } + + public void stop() { + if (executor != null) { + executor.shutdown(); + } + } + + public void logStatistic() { + Map top = getTopElements(intervalRegistry.getContent()); + log(top); + } + + protected Map getTopElements(Map countMap) { + MinMaxPriorityQueue> topQueue = MinMaxPriorityQueue + .orderedBy(Comparator.comparing((Function, Long>) Map.Entry::getValue).reversed()) + .maximumSize(topSize) + .create(countMap.entrySet()); + + return topQueue.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private void log(Map top) { + StringBuilder builder = new StringBuilder("Quota Statistic : "); + for (Map.Entry host : top.entrySet()) { + builder.append(host.getKey()).append(" : ").append(host.getValue()); + } + + log.info(builder.toString()); + } +} diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/ClockTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/ClockTest.java new file mode 100644 index 0000000000..6ed5445aec --- /dev/null +++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/ClockTest.java @@ -0,0 +1,66 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public class ClockTest { + + @Before + public void init() { + Clock.reset(); + } + + @After + public void clear() { + Clock.reset(); + } + + @Test + public void defaultClockUseSystemTime() { + assertFalse(Clock.millis() > System.currentTimeMillis()); + } + + @Test + public void timeCanBeSet() { + Clock.setMillis(100L); + assertEquals(100L, Clock.millis()); + } + + @Test + public void clockCanBeReseted() { + Clock.setMillis(100L); + assertEquals(100L, Clock.millis()); + Clock.reset(); + assertFalse(Clock.millis() > System.currentTimeMillis()); + } + + @Test + public void timeIsShifted() { + Clock.setMillis(100L); + Clock.shift(50L); + assertEquals(150L, Clock.millis()); + } + +} \ No newline at end of file diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicyTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicyTest.java new file mode 100644 index 0000000000..f28d17c5f9 --- /dev/null +++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicyTest.java @@ -0,0 +1,46 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public class HostRequestLimitPolicyTest { + + private HostRequestLimitPolicy limitPolicy = new HostRequestLimitPolicy(10L); + + @Test + public void ifCurrentValueLessThenLimitItIsValid() { + assertTrue(limitPolicy.isValid(9)); + } + + @Test + public void ifCurrentValueEqualsToLimitItIsValid() { + assertTrue(limitPolicy.isValid(10)); + } + + @Test + public void ifCurrentValueGreaterThenLimitItIsValid() { + assertFalse(limitPolicy.isValid(11)); + } + +} \ No newline at end of file diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java new file mode 100644 index 0000000000..475fcccead --- /dev/null +++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java @@ -0,0 +1,64 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota; + +import org.junit.Before; +import org.junit.Test; +import org.thingsboard.server.common.transport.quota.inmemory.HostRequestIntervalRegistry; +import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner; +import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public class HostRequestsQuotaServiceTest { + + private HostRequestsQuotaService quotaService; + + private HostRequestIntervalRegistry requestRegistry = mock(HostRequestIntervalRegistry.class); + private HostRequestLimitPolicy requestsPolicy = mock(HostRequestLimitPolicy.class); + private IntervalRegistryCleaner registryCleaner = mock(IntervalRegistryCleaner.class); + private IntervalRegistryLogger registryLogger = mock(IntervalRegistryLogger.class); + + @Before + public void init() { + quotaService = new HostRequestsQuotaService(requestRegistry, requestsPolicy, registryCleaner, registryLogger, true); + } + + @Test + public void hostQuotaValidated() { + when(requestRegistry.tick("key")).thenReturn(10L); + when(requestsPolicy.isValid(10L)).thenReturn(true); + + assertTrue(quotaService.isQuotaExceeded("key")); + + verify(requestRegistry).tick("key"); + verify(requestsPolicy).isValid(10L); + verifyNoMoreInteractions(requestRegistry, requestsPolicy); + } + + @Test + public void serviceCanBeDisabled() { + quotaService = new HostRequestsQuotaService(requestRegistry, requestsPolicy, registryCleaner, registryLogger, false); + assertFalse(quotaService.isQuotaExceeded("key")); + verifyNoMoreInteractions(requestRegistry, requestsPolicy); + } +} \ No newline at end of file diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistryTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistryTest.java new file mode 100644 index 0000000000..cb21ccebd5 --- /dev/null +++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistryTest.java @@ -0,0 +1,57 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota.inmemory; + +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public class HostRequestIntervalRegistryTest { + + private HostRequestIntervalRegistry registry; + + @Before + public void init() { + registry = new HostRequestIntervalRegistry(10000L, 100L); + } + + @Test + public void newHostCreateNewInterval() { + assertEquals(1L, registry.tick("host1")); + } + + @Test + public void existingHostUpdated() { + registry.tick("aaa"); + assertEquals(1L, registry.tick("bbb")); + assertEquals(2L, registry.tick("aaa")); + } + + @Test + public void expiredIntervalsCleaned() throws InterruptedException { + registry.tick("aaa"); + Thread.sleep(150L); + registry.tick("bbb"); + registry.clean(); + assertEquals(1L, registry.tick("aaa")); + assertEquals(2L, registry.tick("bbb")); + } +} \ No newline at end of file diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCountTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCountTest.java new file mode 100644 index 0000000000..7bdcafde3c --- /dev/null +++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCountTest.java @@ -0,0 +1,65 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota.inmemory; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.thingsboard.server.common.transport.quota.Clock; + +import static org.junit.Assert.assertEquals; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public class IntervalCountTest { + + @Before + public void init() { + Clock.setMillis(1000L); + } + + @After + public void clear() { + Clock.reset(); + } + + @Test + public void ticksInSameIntervalAreSummed() { + IntervalCount intervalCount = new IntervalCount(100L); + assertEquals(1L, intervalCount.resetIfExpiredAndTick()); + Clock.shift(100); + assertEquals(2L, intervalCount.resetIfExpiredAndTick()); + } + + @Test + public void oldDataCleanedWhenIntervalExpired() { + IntervalCount intervalCount = new IntervalCount(100L); + assertEquals(1L, intervalCount.resetIfExpiredAndTick()); + Clock.shift(101); + assertEquals(1L, intervalCount.resetIfExpiredAndTick()); + } + + @Test + public void silenceDurationCalculatedFromLastTick() { + IntervalCount intervalCount = new IntervalCount(100L); + assertEquals(1L, intervalCount.resetIfExpiredAndTick()); + Clock.shift(10L); + assertEquals(10L, intervalCount.silenceDuration()); + } + +} \ No newline at end of file diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLoggerTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLoggerTest.java new file mode 100644 index 0000000000..cc25b4c402 --- /dev/null +++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLoggerTest.java @@ -0,0 +1,61 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.quota.inmemory; + +import com.google.common.collect.ImmutableMap; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +/** + * @author Vitaliy Paromskiy + * @version 1.0 + */ +public class IntervalRegistryLoggerTest { + + private IntervalRegistryLogger logger; + + private HostRequestIntervalRegistry requestRegistry = mock(HostRequestIntervalRegistry.class); + + @Before + public void init() { + logger = new IntervalRegistryLogger(3, 10, requestRegistry); + } + + @Test + public void onlyMaxHostsCollected() { + Map map = ImmutableMap.of("a", 8L, "b", 3L, "c", 1L, "d", 3L); + Map actual = logger.getTopElements(map); + Map expected = ImmutableMap.of("a", 8L, "b", 3L, "d", 3L); + + assertEquals(expected, actual); + } + + @Test + public void emptyMapProcessedCorrectly() { + Map map = Collections.emptyMap(); + Map actual = logger.getTopElements(map); + Map expected = Collections.emptyMap(); + + assertEquals(expected, actual); + } + +} \ No newline at end of file diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 834a911e77..2ffbf05f6e 100644 --- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -34,6 +34,7 @@ import org.thingsboard.server.common.msg.session.*; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.auth.DeviceAuthService; +import org.thingsboard.server.common.transport.quota.QuotaService; import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; import org.thingsboard.server.transport.coap.session.CoapExchangeObserverProxy; import org.thingsboard.server.transport.coap.session.CoapSessionCtx; @@ -51,13 +52,16 @@ public class CoapTransportResource extends CoapResource { private final CoapTransportAdaptor adaptor; private final SessionMsgProcessor processor; private final DeviceAuthService authService; + private final QuotaService quotaService; private final Field observerField; private final long timeout; - public CoapTransportResource(SessionMsgProcessor processor, DeviceAuthService authService, CoapTransportAdaptor adaptor, String name, long timeout) { + public CoapTransportResource(SessionMsgProcessor processor, DeviceAuthService authService, CoapTransportAdaptor adaptor, String name, + long timeout, QuotaService quotaService) { super(name); this.processor = processor; this.authService = authService; + this.quotaService = quotaService; this.adaptor = adaptor; this.timeout = timeout; // This is important to turn off existing observable logic in @@ -70,6 +74,13 @@ public class CoapTransportResource extends CoapResource { @Override public void handleGET(CoapExchange exchange) { + if(quotaService.isQuotaExceeded(exchange.getSourceAddress().getHostAddress())) { + log.warn("Missing feature type parameter"); + log.warn("COAP Quota exceeded for [{}:{}] . Disconnect", exchange.getSourceAddress().getHostAddress(), exchange.getSourcePort()); + exchange.respond(ResponseCode.BAD_REQUEST); + return; + } + Optional featureType = getFeatureType(exchange.advanced().getRequest()); if (!featureType.isPresent()) { log.trace("Missing feature type parameter"); diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java index a78c718d4f..7dcf77c036 100644 --- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java +++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java @@ -15,26 +15,24 @@ */ package org.thingsboard.server.transport.coap; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; - import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.CoapResource; import org.eclipse.californium.core.CoapServer; import org.eclipse.californium.core.network.CoapEndpoint; -import org.thingsboard.server.common.transport.SessionMsgProcessor; -import org.thingsboard.server.common.transport.auth.DeviceAuthService; -import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.transport.SessionMsgProcessor; +import org.thingsboard.server.common.transport.auth.DeviceAuthService; +import org.thingsboard.server.common.transport.quota.QuotaService; +import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; @Service("CoapTransportService") @Slf4j @@ -54,6 +52,9 @@ public class CoapTransportService { @Autowired(required = false) private DeviceAuthService authService; + @Autowired(required = false) + private QuotaService quotaService; + @Value("${coap.bind_address}") private String host; @@ -83,7 +84,7 @@ public class CoapTransportService { private void createResources() { CoapResource api = new CoapResource(API); - api.add(new CoapTransportResource(processor, authService, adaptor, V1, timeout)); + api.add(new CoapTransportResource(processor, authService, adaptor, V1, timeout, quotaService)); server.add(api); } diff --git a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java index 706af80ec9..5ac589099a 100644 --- a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java +++ b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java @@ -50,6 +50,7 @@ import org.thingsboard.server.common.msg.session.*; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.auth.DeviceAuthResult; import org.thingsboard.server.common.transport.auth.DeviceAuthService; +import org.thingsboard.server.common.transport.quota.QuotaService; import java.util.ArrayList; import java.util.List; @@ -131,6 +132,11 @@ public class CoapServerTest { } }; } + + @Bean + public static QuotaService quotaService() { + return key -> false; + } } @Autowired diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index 70767afdfc..c799dcf7b7 100644 --- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -35,10 +35,11 @@ import org.thingsboard.server.common.msg.session.FromDeviceMsg; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.auth.DeviceAuthService; +import org.thingsboard.server.common.transport.quota.QuotaService; import org.thingsboard.server.transport.http.session.HttpSessionCtx; +import javax.servlet.http.HttpServletRequest; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -59,11 +60,18 @@ public class DeviceApiController { @Autowired(required = false) private DeviceAuthService authService; + @Autowired(required = false) + private QuotaService quotaService; + @RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json") public DeferredResult getDeviceAttributes(@PathVariable("deviceToken") String deviceToken, @RequestParam(value = "clientKeys", required = false, defaultValue = "") String clientKeys, - @RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys) { + @RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys, + HttpServletRequest httpRequest) { DeferredResult responseWriter = new DeferredResult(); + if (quotaExceeded(httpRequest, responseWriter)) { + return responseWriter; + } HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); if (ctx.login(new DeviceTokenCredentials(deviceToken))) { GetAttributesRequest request; @@ -84,8 +92,11 @@ public class DeviceApiController { @RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.POST) public DeferredResult postDeviceAttributes(@PathVariable("deviceToken") String deviceToken, - @RequestBody String json) { + @RequestBody String json, HttpServletRequest request) { DeferredResult responseWriter = new DeferredResult(); + if (quotaExceeded(request, responseWriter)) { + return responseWriter; + } HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); if (ctx.login(new DeviceTokenCredentials(deviceToken))) { try { @@ -101,8 +112,11 @@ public class DeviceApiController { @RequestMapping(value = "/{deviceToken}/telemetry", method = RequestMethod.POST) public DeferredResult postTelemetry(@PathVariable("deviceToken") String deviceToken, - @RequestBody String json) { + @RequestBody String json, HttpServletRequest request) { DeferredResult responseWriter = new DeferredResult(); + if (quotaExceeded(request, responseWriter)) { + return responseWriter; + } HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); if (ctx.login(new DeviceTokenCredentials(deviceToken))) { try { @@ -118,15 +132,20 @@ public class DeviceApiController { @RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.GET, produces = "application/json") public DeferredResult subscribeToCommands(@PathVariable("deviceToken") String deviceToken, - @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout) { - return subscribe(deviceToken, timeout, new RpcSubscribeMsg()); + @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout, + HttpServletRequest request) { + + return subscribe(deviceToken, timeout, new RpcSubscribeMsg(), request); } @RequestMapping(value = "/{deviceToken}/rpc/{requestId}", method = RequestMethod.POST) public DeferredResult replyToCommand(@PathVariable("deviceToken") String deviceToken, @PathVariable("requestId") Integer requestId, - @RequestBody String json) { + @RequestBody String json, HttpServletRequest request) { DeferredResult responseWriter = new DeferredResult(); + if (quotaExceeded(request, responseWriter)) { + return responseWriter; + } HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); if (ctx.login(new DeviceTokenCredentials(deviceToken))) { try { @@ -143,8 +162,11 @@ public class DeviceApiController { @RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.POST) public DeferredResult postRpcRequest(@PathVariable("deviceToken") String deviceToken, - @RequestBody String json) { + @RequestBody String json, HttpServletRequest httpRequest) { DeferredResult responseWriter = new DeferredResult(); + if (quotaExceeded(httpRequest, responseWriter)) { + return responseWriter; + } HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); if (ctx.login(new DeviceTokenCredentials(deviceToken))) { try { @@ -163,12 +185,17 @@ public class DeviceApiController { @RequestMapping(value = "/{deviceToken}/attributes/updates", method = RequestMethod.GET, produces = "application/json") public DeferredResult subscribeToAttributes(@PathVariable("deviceToken") String deviceToken, - @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout) { - return subscribe(deviceToken, timeout, new AttributesSubscribeMsg()); + @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout, + HttpServletRequest httpRequest) { + + return subscribe(deviceToken, timeout, new AttributesSubscribeMsg(), httpRequest); } - private DeferredResult subscribe(String deviceToken, long timeout, FromDeviceMsg msg) { + private DeferredResult subscribe(String deviceToken, long timeout, FromDeviceMsg msg, HttpServletRequest httpRequest) { DeferredResult responseWriter = new DeferredResult(); + if (quotaExceeded(httpRequest, responseWriter)) { + return responseWriter; + } HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout); if (ctx.login(new DeviceTokenCredentials(deviceToken))) { try { @@ -195,4 +222,13 @@ public class DeviceApiController { processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg)); } + private boolean quotaExceeded(HttpServletRequest request, DeferredResult responseWriter) { + if (quotaService.isQuotaExceeded(request.getRemoteAddr())) { + log.warn("REST Quota exceeded for [{}] . Disconnect", request.getRemoteAddr()); + responseWriter.setResult(new ResponseEntity<>(HttpStatus.BANDWIDTH_LIMIT_EXCEEDED)); + return true; + } + return false; + } + } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 2e6abfde96..6fd559bdbf 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -16,7 +16,6 @@ package org.thingsboard.server.transport.mqtt; import com.fasterxml.jackson.databind.JsonNode; -import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.mqtt.*; @@ -36,18 +35,18 @@ import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.auth.DeviceAuthService; +import org.thingsboard.server.common.transport.quota.QuotaService; import org.thingsboard.server.dao.EncryptionUtil; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; -import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; +import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx; import org.thingsboard.server.transport.mqtt.util.SslUtil; import javax.net.ssl.SSLPeerUnverifiedException; import javax.security.cert.X509Certificate; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.ArrayList; import java.util.List; @@ -72,13 +71,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private final DeviceService deviceService; private final DeviceAuthService authService; private final RelationService relationService; + private final QuotaService quotaService; private final SslHandler sslHandler; private volatile boolean connected; private volatile InetSocketAddress address; private volatile GatewaySessionCtx gatewaySessionCtx; public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, - MqttTransportAdaptor adaptor, SslHandler sslHandler) { + MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) { this.processor = processor; this.deviceService = deviceService; this.relationService = relationService; @@ -87,6 +87,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor); this.sessionId = deviceSessionCtx.getSessionId().toUidStr(); this.sslHandler = sslHandler; + this.quotaService = quotaService; } @Override @@ -102,35 +103,43 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (msg.fixedHeader() == null) { log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort()); processDisconnect(ctx); - } else { - deviceSessionCtx.setChannel(ctx); - switch (msg.fixedHeader().messageType()) { - case CONNECT: - processConnect(ctx, (MqttConnectMessage) msg); - break; - case PUBLISH: - processPublish(ctx, (MqttPublishMessage) msg); - break; - case SUBSCRIBE: - processSubscribe(ctx, (MqttSubscribeMessage) msg); - break; - case UNSUBSCRIBE: - processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); - break; - case PINGREQ: - if (checkConnected(ctx)) { - ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); - } - break; - case DISCONNECT: - if (checkConnected(ctx)) { - processDisconnect(ctx); - } - break; - default: - break; - } + return; } + + if (quotaService.isQuotaExceeded(address.getHostName())) { + log.warn("MQTT Quota exceeded for [{}:{}] . Disconnect", address.getHostName(), address.getPort()); + processDisconnect(ctx); + return; + } + + deviceSessionCtx.setChannel(ctx); + switch (msg.fixedHeader().messageType()) { + case CONNECT: + processConnect(ctx, (MqttConnectMessage) msg); + break; + case PUBLISH: + processPublish(ctx, (MqttPublishMessage) msg); + break; + case SUBSCRIBE: + processSubscribe(ctx, (MqttSubscribeMessage) msg); + break; + case UNSUBSCRIBE: + processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); + break; + case PINGREQ: + if (checkConnected(ctx)) { + ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); + } + break; + case DISCONNECT: + if (checkConnected(ctx)) { + processDisconnect(ctx); + } + break; + default: + break; + } + } private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java index 1469290837..93812a80a8 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java @@ -15,27 +15,19 @@ */ package org.thingsboard.server.transport.mqtt; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; -import io.netty.handler.ssl.util.SelfSignedCertificate; -import org.springframework.beans.factory.annotation.Value; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.auth.DeviceAuthService; +import org.thingsboard.server.common.transport.quota.QuotaService; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; -import javax.net.ssl.SSLException; -import java.security.cert.CertificateException; - /** * @author Andrew Shvayka */ @@ -49,16 +41,18 @@ public class MqttTransportServerInitializer extends ChannelInitializer