diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index e1bad01cce..657e28e173 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -107,6 +107,18 @@ coap:
adaptor: "${COAP_ADAPTOR_NAME:JsonCoapAdaptor}"
timeout: "${COAP_TIMEOUT:10000}"
+#Quota parameters
+quota:
+ host:
+ limit: "${QUOTA_HOST_LIMIT:10000}"
+ intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}"
+ ttlMs: "${QUOTA_HOST_TTL_MS:60000}"
+ cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}"
+ enabled: "${QUOTA_HOST_ENABLED:false}"
+ log:
+ topSize: 10
+ intervalMin: 2
+
database:
type: "${DATABASE_TYPE:sql}" # cassandra OR sql
diff --git a/common/transport/pom.xml b/common/transport/pom.xml
index f609e9a2c8..3243131e1c 100644
--- a/common/transport/pom.xml
+++ b/common/transport/pom.xml
@@ -74,6 +74,18 @@
mockito-all
test
+
+ org.springframework
+ spring-context
+
+
+ com.google.guava
+ guava
+
+
+ org.apache.commons
+ commons-lang3
+
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/Clock.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/Clock.java
new file mode 100644
index 0000000000..e832354b1e
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/Clock.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota;
+
+/**
+ * @author Vitaliy Paromskiy
+ * @version 1.0
+ */
+public final class Clock {
+
+ private static long time = 0L;
+
+ private Clock() {
+ }
+
+
+ public static long millis() {
+ return time == 0 ? System.currentTimeMillis() : time;
+ }
+
+ public static void setMillis(long millis) {
+ time = millis;
+ }
+
+ public static void shift(long delta) {
+ time += delta;
+ }
+
+ public static void reset() {
+ time = 0;
+ }
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicy.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicy.java
new file mode 100644
index 0000000000..83d664123e
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicy.java
@@ -0,0 +1,38 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author Vitaliy Paromskiy
+ * @version 1.0
+ */
+@Component
+public class HostRequestLimitPolicy {
+
+ private final long limit;
+
+ public HostRequestLimitPolicy(@Value("${quota.host.limit}") long limit) {
+ this.limit = limit;
+ }
+
+ public boolean isValid(long currentValue) {
+ return currentValue <= limit;
+ }
+
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaService.java
new file mode 100644
index 0000000000..7ef4df258f
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaService.java
@@ -0,0 +1,76 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.transport.quota.inmemory.HostRequestIntervalRegistry;
+import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner;
+import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+/**
+ * @author Vitaliy Paromskiy
+ * @version 1.0
+ */
+@Service
+@Slf4j
+public class HostRequestsQuotaService implements QuotaService {
+
+ private final HostRequestIntervalRegistry requestRegistry;
+ private final HostRequestLimitPolicy requestsPolicy;
+ private final IntervalRegistryCleaner registryCleaner;
+ private final IntervalRegistryLogger registryLogger;
+ private final boolean enabled;
+
+ public HostRequestsQuotaService(HostRequestIntervalRegistry requestRegistry, HostRequestLimitPolicy requestsPolicy,
+ IntervalRegistryCleaner registryCleaner, IntervalRegistryLogger registryLogger,
+ @Value("${quota.host.enabled}") boolean enabled) {
+ this.requestRegistry = requestRegistry;
+ this.requestsPolicy = requestsPolicy;
+ this.registryCleaner = registryCleaner;
+ this.registryLogger = registryLogger;
+ this.enabled = enabled;
+ }
+
+ @PostConstruct
+ public void init() {
+ if (enabled) {
+ registryCleaner.schedule();
+ registryLogger.schedule();
+ }
+ }
+
+ @PreDestroy
+ public void close() {
+ if (enabled) {
+ registryCleaner.stop();
+ registryLogger.stop();
+ }
+ }
+
+ @Override
+ public boolean isQuotaExceeded(String key) {
+ if (enabled) {
+ long count = requestRegistry.tick(key);
+ return requestsPolicy.isValid(count);
+ }
+ return false;
+ }
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/QuotaService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/QuotaService.java
new file mode 100644
index 0000000000..cea5db63d1
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/QuotaService.java
@@ -0,0 +1,25 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota;
+
+/**
+ * @author Vitaliy Paromskiy
+ * @version 1.0
+ */
+public interface QuotaService {
+
+ boolean isQuotaExceeded(String key);
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java
new file mode 100644
index 0000000000..0f5da2b1a8
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota.inmemory;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * @author Vitaliy Paromskiy
+ * @version 1.0
+ */
+@Component
+@Slf4j
+public class HostRequestIntervalRegistry {
+
+ private final Map hostCounts = new ConcurrentHashMap<>();
+ private final long intervalDurationMs;
+ private final long ttlMs;
+
+ public HostRequestIntervalRegistry(@Value("${quota.host.intervalMs}") long intervalDurationMs,
+ @Value("${quota.host.ttlMs}") long ttlMs) {
+ this.intervalDurationMs = intervalDurationMs;
+ this.ttlMs = ttlMs;
+ }
+
+ @PostConstruct
+ public void init() {
+ if (ttlMs < intervalDurationMs) {
+ log.warn("TTL for IntervalRegistry [{}] smaller than interval duration [{}]", ttlMs, intervalDurationMs);
+ }
+ }
+
+ public long tick(String clientHostId) {
+ IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs));
+ return intervalCount.resetIfExpiredAndTick();
+ }
+
+ public void clean() {
+ hostCounts.entrySet().removeIf(entry -> entry.getValue().silenceDuration() > ttlMs);
+ }
+
+ public Map getContent() {
+ return hostCounts.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ interval -> interval.getValue().getCount()));
+ }
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCount.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCount.java
new file mode 100644
index 0000000000..8301b8e6f1
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCount.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota.inmemory;
+
+
+import org.thingsboard.server.common.transport.quota.Clock;
+
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * @author Vitaliy Paromskiy
+ * @version 1.0
+ */
+public class IntervalCount {
+
+ private final LongAdder adder = new LongAdder();
+ private final long intervalDurationMs;
+ private volatile long startTime;
+ private volatile long lastTickTime;
+
+ public IntervalCount(long intervalDurationMs) {
+ this.intervalDurationMs = intervalDurationMs;
+ startTime = Clock.millis();
+ }
+
+ public long resetIfExpiredAndTick() {
+ if (isExpired()) {
+ reset();
+ }
+ tick();
+ return adder.sum();
+ }
+
+ public long silenceDuration() {
+ return Clock.millis() - lastTickTime;
+ }
+
+ public long getCount() {
+ return adder.sum();
+ }
+
+ private void tick() {
+ adder.add(1);
+ lastTickTime = Clock.millis();
+ }
+
+ private void reset() {
+ adder.reset();
+ startTime = Clock.millis();
+ }
+
+ private boolean isExpired() {
+ return (Clock.millis() - startTime) > intervalDurationMs;
+ }
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryCleaner.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryCleaner.java
new file mode 100644
index 0000000000..1e2076a0b0
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryCleaner.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota.inmemory;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Vitaliy Paromskiy
+ * @version 1.0
+ */
+@Component
+@Slf4j
+public class IntervalRegistryCleaner {
+
+ private final HostRequestIntervalRegistry intervalRegistry;
+ private final long cleanPeriodMs;
+ private ScheduledExecutorService executor;
+
+ public IntervalRegistryCleaner(HostRequestIntervalRegistry intervalRegistry, @Value("${quota.host.cleanPeriodMs}") long cleanPeriodMs) {
+ this.intervalRegistry = intervalRegistry;
+ this.cleanPeriodMs = cleanPeriodMs;
+ }
+
+ public void schedule() {
+ if (executor != null) {
+ throw new IllegalStateException("Registry Cleaner already scheduled");
+ }
+ executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleAtFixedRate(this::clean, cleanPeriodMs, cleanPeriodMs, TimeUnit.MILLISECONDS);
+ }
+
+ public void stop() {
+ if (executor != null) {
+ executor.shutdown();
+ }
+ }
+
+ public void clean() {
+ try {
+ intervalRegistry.clean();
+ } catch (RuntimeException ex) {
+ log.error("Could not clear Interval Registry", ex);
+ }
+ }
+
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLogger.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLogger.java
new file mode 100644
index 0000000000..91797d4422
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLogger.java
@@ -0,0 +1,88 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota.inmemory;
+
+import com.google.common.collect.MinMaxPriorityQueue;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * @author Vitaliy Paromskiy
+ * @version 1.0
+ */
+@Component
+@Slf4j
+public class IntervalRegistryLogger {
+
+ private final int topSize;
+ private final HostRequestIntervalRegistry intervalRegistry;
+ private final long logIntervalMin;
+ private ScheduledExecutorService executor;
+
+ public IntervalRegistryLogger(@Value("${quota.log.topSize}") int topSize, @Value("${quota.log.intervalMin}") long logIntervalMin,
+ HostRequestIntervalRegistry intervalRegistry) {
+ this.topSize = topSize;
+ this.logIntervalMin = logIntervalMin;
+ this.intervalRegistry = intervalRegistry;
+ }
+
+ public void schedule() {
+ if (executor != null) {
+ throw new IllegalStateException("Registry Cleaner already scheduled");
+ }
+ executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleAtFixedRate(this::logStatistic, logIntervalMin, logIntervalMin, TimeUnit.MINUTES);
+ }
+
+ public void stop() {
+ if (executor != null) {
+ executor.shutdown();
+ }
+ }
+
+ public void logStatistic() {
+ Map top = getTopElements(intervalRegistry.getContent());
+ log(top);
+ }
+
+ protected Map getTopElements(Map countMap) {
+ MinMaxPriorityQueue> topQueue = MinMaxPriorityQueue
+ .orderedBy(Comparator.comparing((Function, Long>) Map.Entry::getValue).reversed())
+ .maximumSize(topSize)
+ .create(countMap.entrySet());
+
+ return topQueue.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ private void log(Map top) {
+ StringBuilder builder = new StringBuilder("Quota Statistic : ");
+ for (Map.Entry host : top.entrySet()) {
+ builder.append(host.getKey()).append(" : ").append(host.getValue());
+ }
+
+ log.info(builder.toString());
+ }
+}
diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/ClockTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/ClockTest.java
new file mode 100644
index 0000000000..6ed5445aec
--- /dev/null
+++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/ClockTest.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Vitaliy Paromskiy
+ * @version 1.0
+ */
+public class ClockTest {
+
+ @Before
+ public void init() {
+ Clock.reset();
+ }
+
+ @After
+ public void clear() {
+ Clock.reset();
+ }
+
+ @Test
+ public void defaultClockUseSystemTime() {
+ assertFalse(Clock.millis() > System.currentTimeMillis());
+ }
+
+ @Test
+ public void timeCanBeSet() {
+ Clock.setMillis(100L);
+ assertEquals(100L, Clock.millis());
+ }
+
+ @Test
+ public void clockCanBeReseted() {
+ Clock.setMillis(100L);
+ assertEquals(100L, Clock.millis());
+ Clock.reset();
+ assertFalse(Clock.millis() > System.currentTimeMillis());
+ }
+
+ @Test
+ public void timeIsShifted() {
+ Clock.setMillis(100L);
+ Clock.shift(50L);
+ assertEquals(150L, Clock.millis());
+ }
+
+}
\ No newline at end of file
diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicyTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicyTest.java
new file mode 100644
index 0000000000..f28d17c5f9
--- /dev/null
+++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicyTest.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Vitaliy Paromskiy
+ * @version 1.0
+ */
+public class HostRequestLimitPolicyTest {
+
+ private HostRequestLimitPolicy limitPolicy = new HostRequestLimitPolicy(10L);
+
+ @Test
+ public void ifCurrentValueLessThenLimitItIsValid() {
+ assertTrue(limitPolicy.isValid(9));
+ }
+
+ @Test
+ public void ifCurrentValueEqualsToLimitItIsValid() {
+ assertTrue(limitPolicy.isValid(10));
+ }
+
+ @Test
+ public void ifCurrentValueGreaterThenLimitItIsValid() {
+ assertFalse(limitPolicy.isValid(11));
+ }
+
+}
\ No newline at end of file
diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java
new file mode 100644
index 0000000000..475fcccead
--- /dev/null
+++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java
@@ -0,0 +1,64 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.thingsboard.server.common.transport.quota.inmemory.HostRequestIntervalRegistry;
+import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner;
+import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+
+/**
+ * @author Vitaliy Paromskiy
+ * @version 1.0
+ */
+public class HostRequestsQuotaServiceTest {
+
+ private HostRequestsQuotaService quotaService;
+
+ private HostRequestIntervalRegistry requestRegistry = mock(HostRequestIntervalRegistry.class);
+ private HostRequestLimitPolicy requestsPolicy = mock(HostRequestLimitPolicy.class);
+ private IntervalRegistryCleaner registryCleaner = mock(IntervalRegistryCleaner.class);
+ private IntervalRegistryLogger registryLogger = mock(IntervalRegistryLogger.class);
+
+ @Before
+ public void init() {
+ quotaService = new HostRequestsQuotaService(requestRegistry, requestsPolicy, registryCleaner, registryLogger, true);
+ }
+
+ @Test
+ public void hostQuotaValidated() {
+ when(requestRegistry.tick("key")).thenReturn(10L);
+ when(requestsPolicy.isValid(10L)).thenReturn(true);
+
+ assertTrue(quotaService.isQuotaExceeded("key"));
+
+ verify(requestRegistry).tick("key");
+ verify(requestsPolicy).isValid(10L);
+ verifyNoMoreInteractions(requestRegistry, requestsPolicy);
+ }
+
+ @Test
+ public void serviceCanBeDisabled() {
+ quotaService = new HostRequestsQuotaService(requestRegistry, requestsPolicy, registryCleaner, registryLogger, false);
+ assertFalse(quotaService.isQuotaExceeded("key"));
+ verifyNoMoreInteractions(requestRegistry, requestsPolicy);
+ }
+}
\ No newline at end of file
diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistryTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistryTest.java
new file mode 100644
index 0000000000..cb21ccebd5
--- /dev/null
+++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistryTest.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota.inmemory;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Vitaliy Paromskiy
+ * @version 1.0
+ */
+public class HostRequestIntervalRegistryTest {
+
+ private HostRequestIntervalRegistry registry;
+
+ @Before
+ public void init() {
+ registry = new HostRequestIntervalRegistry(10000L, 100L);
+ }
+
+ @Test
+ public void newHostCreateNewInterval() {
+ assertEquals(1L, registry.tick("host1"));
+ }
+
+ @Test
+ public void existingHostUpdated() {
+ registry.tick("aaa");
+ assertEquals(1L, registry.tick("bbb"));
+ assertEquals(2L, registry.tick("aaa"));
+ }
+
+ @Test
+ public void expiredIntervalsCleaned() throws InterruptedException {
+ registry.tick("aaa");
+ Thread.sleep(150L);
+ registry.tick("bbb");
+ registry.clean();
+ assertEquals(1L, registry.tick("aaa"));
+ assertEquals(2L, registry.tick("bbb"));
+ }
+}
\ No newline at end of file
diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCountTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCountTest.java
new file mode 100644
index 0000000000..7bdcafde3c
--- /dev/null
+++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalCountTest.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota.inmemory;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.thingsboard.server.common.transport.quota.Clock;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Vitaliy Paromskiy
+ * @version 1.0
+ */
+public class IntervalCountTest {
+
+ @Before
+ public void init() {
+ Clock.setMillis(1000L);
+ }
+
+ @After
+ public void clear() {
+ Clock.reset();
+ }
+
+ @Test
+ public void ticksInSameIntervalAreSummed() {
+ IntervalCount intervalCount = new IntervalCount(100L);
+ assertEquals(1L, intervalCount.resetIfExpiredAndTick());
+ Clock.shift(100);
+ assertEquals(2L, intervalCount.resetIfExpiredAndTick());
+ }
+
+ @Test
+ public void oldDataCleanedWhenIntervalExpired() {
+ IntervalCount intervalCount = new IntervalCount(100L);
+ assertEquals(1L, intervalCount.resetIfExpiredAndTick());
+ Clock.shift(101);
+ assertEquals(1L, intervalCount.resetIfExpiredAndTick());
+ }
+
+ @Test
+ public void silenceDurationCalculatedFromLastTick() {
+ IntervalCount intervalCount = new IntervalCount(100L);
+ assertEquals(1L, intervalCount.resetIfExpiredAndTick());
+ Clock.shift(10L);
+ assertEquals(10L, intervalCount.silenceDuration());
+ }
+
+}
\ No newline at end of file
diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLoggerTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLoggerTest.java
new file mode 100644
index 0000000000..cc25b4c402
--- /dev/null
+++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLoggerTest.java
@@ -0,0 +1,61 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota.inmemory;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+/**
+ * @author Vitaliy Paromskiy
+ * @version 1.0
+ */
+public class IntervalRegistryLoggerTest {
+
+ private IntervalRegistryLogger logger;
+
+ private HostRequestIntervalRegistry requestRegistry = mock(HostRequestIntervalRegistry.class);
+
+ @Before
+ public void init() {
+ logger = new IntervalRegistryLogger(3, 10, requestRegistry);
+ }
+
+ @Test
+ public void onlyMaxHostsCollected() {
+ Map map = ImmutableMap.of("a", 8L, "b", 3L, "c", 1L, "d", 3L);
+ Map actual = logger.getTopElements(map);
+ Map expected = ImmutableMap.of("a", 8L, "b", 3L, "d", 3L);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void emptyMapProcessedCorrectly() {
+ Map map = Collections.emptyMap();
+ Map actual = logger.getTopElements(map);
+ Map expected = Collections.emptyMap();
+
+ assertEquals(expected, actual);
+ }
+
+}
\ No newline at end of file
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
index 834a911e77..2ffbf05f6e 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
@@ -34,6 +34,7 @@ import org.thingsboard.server.common.msg.session.*;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
import org.thingsboard.server.transport.coap.session.CoapExchangeObserverProxy;
import org.thingsboard.server.transport.coap.session.CoapSessionCtx;
@@ -51,13 +52,16 @@ public class CoapTransportResource extends CoapResource {
private final CoapTransportAdaptor adaptor;
private final SessionMsgProcessor processor;
private final DeviceAuthService authService;
+ private final QuotaService quotaService;
private final Field observerField;
private final long timeout;
- public CoapTransportResource(SessionMsgProcessor processor, DeviceAuthService authService, CoapTransportAdaptor adaptor, String name, long timeout) {
+ public CoapTransportResource(SessionMsgProcessor processor, DeviceAuthService authService, CoapTransportAdaptor adaptor, String name,
+ long timeout, QuotaService quotaService) {
super(name);
this.processor = processor;
this.authService = authService;
+ this.quotaService = quotaService;
this.adaptor = adaptor;
this.timeout = timeout;
// This is important to turn off existing observable logic in
@@ -70,6 +74,13 @@ public class CoapTransportResource extends CoapResource {
@Override
public void handleGET(CoapExchange exchange) {
+ if(quotaService.isQuotaExceeded(exchange.getSourceAddress().getHostAddress())) {
+ log.warn("Missing feature type parameter");
+ log.warn("COAP Quota exceeded for [{}:{}] . Disconnect", exchange.getSourceAddress().getHostAddress(), exchange.getSourcePort());
+ exchange.respond(ResponseCode.BAD_REQUEST);
+ return;
+ }
+
Optional featureType = getFeatureType(exchange.advanced().getRequest());
if (!featureType.isPresent()) {
log.trace("Missing feature type parameter");
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java
index a78c718d4f..7dcf77c036 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java
@@ -15,26 +15,24 @@
*/
package org.thingsboard.server.transport.coap;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.CoapResource;
import org.eclipse.californium.core.CoapServer;
import org.eclipse.californium.core.network.CoapEndpoint;
-import org.thingsboard.server.common.transport.SessionMsgProcessor;
-import org.thingsboard.server.common.transport.auth.DeviceAuthService;
-import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.transport.SessionMsgProcessor;
+import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.common.transport.quota.QuotaService;
+import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
@Service("CoapTransportService")
@Slf4j
@@ -54,6 +52,9 @@ public class CoapTransportService {
@Autowired(required = false)
private DeviceAuthService authService;
+ @Autowired(required = false)
+ private QuotaService quotaService;
+
@Value("${coap.bind_address}")
private String host;
@@ -83,7 +84,7 @@ public class CoapTransportService {
private void createResources() {
CoapResource api = new CoapResource(API);
- api.add(new CoapTransportResource(processor, authService, adaptor, V1, timeout));
+ api.add(new CoapTransportResource(processor, authService, adaptor, V1, timeout, quotaService));
server.add(api);
}
diff --git a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
index 706af80ec9..5ac589099a 100644
--- a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
+++ b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
@@ -50,6 +50,7 @@ import org.thingsboard.server.common.msg.session.*;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthResult;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.common.transport.quota.QuotaService;
import java.util.ArrayList;
import java.util.List;
@@ -131,6 +132,11 @@ public class CoapServerTest {
}
};
}
+
+ @Bean
+ public static QuotaService quotaService() {
+ return key -> false;
+ }
}
@Autowired
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
index 70767afdfc..c799dcf7b7 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
@@ -35,10 +35,11 @@ import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.transport.http.session.HttpSessionCtx;
+import javax.servlet.http.HttpServletRequest;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -59,11 +60,18 @@ public class DeviceApiController {
@Autowired(required = false)
private DeviceAuthService authService;
+ @Autowired(required = false)
+ private QuotaService quotaService;
+
@RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json")
public DeferredResult getDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
@RequestParam(value = "clientKeys", required = false, defaultValue = "") String clientKeys,
- @RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys) {
+ @RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys,
+ HttpServletRequest httpRequest) {
DeferredResult responseWriter = new DeferredResult();
+ if (quotaExceeded(httpRequest, responseWriter)) {
+ return responseWriter;
+ }
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
GetAttributesRequest request;
@@ -84,8 +92,11 @@ public class DeviceApiController {
@RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.POST)
public DeferredResult postDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
- @RequestBody String json) {
+ @RequestBody String json, HttpServletRequest request) {
DeferredResult responseWriter = new DeferredResult();
+ if (quotaExceeded(request, responseWriter)) {
+ return responseWriter;
+ }
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
@@ -101,8 +112,11 @@ public class DeviceApiController {
@RequestMapping(value = "/{deviceToken}/telemetry", method = RequestMethod.POST)
public DeferredResult postTelemetry(@PathVariable("deviceToken") String deviceToken,
- @RequestBody String json) {
+ @RequestBody String json, HttpServletRequest request) {
DeferredResult responseWriter = new DeferredResult();
+ if (quotaExceeded(request, responseWriter)) {
+ return responseWriter;
+ }
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
@@ -118,15 +132,20 @@ public class DeviceApiController {
@RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.GET, produces = "application/json")
public DeferredResult subscribeToCommands(@PathVariable("deviceToken") String deviceToken,
- @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout) {
- return subscribe(deviceToken, timeout, new RpcSubscribeMsg());
+ @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
+ HttpServletRequest request) {
+
+ return subscribe(deviceToken, timeout, new RpcSubscribeMsg(), request);
}
@RequestMapping(value = "/{deviceToken}/rpc/{requestId}", method = RequestMethod.POST)
public DeferredResult replyToCommand(@PathVariable("deviceToken") String deviceToken,
@PathVariable("requestId") Integer requestId,
- @RequestBody String json) {
+ @RequestBody String json, HttpServletRequest request) {
DeferredResult responseWriter = new DeferredResult();
+ if (quotaExceeded(request, responseWriter)) {
+ return responseWriter;
+ }
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
@@ -143,8 +162,11 @@ public class DeviceApiController {
@RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.POST)
public DeferredResult postRpcRequest(@PathVariable("deviceToken") String deviceToken,
- @RequestBody String json) {
+ @RequestBody String json, HttpServletRequest httpRequest) {
DeferredResult responseWriter = new DeferredResult();
+ if (quotaExceeded(httpRequest, responseWriter)) {
+ return responseWriter;
+ }
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
@@ -163,12 +185,17 @@ public class DeviceApiController {
@RequestMapping(value = "/{deviceToken}/attributes/updates", method = RequestMethod.GET, produces = "application/json")
public DeferredResult subscribeToAttributes(@PathVariable("deviceToken") String deviceToken,
- @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout) {
- return subscribe(deviceToken, timeout, new AttributesSubscribeMsg());
+ @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
+ HttpServletRequest httpRequest) {
+
+ return subscribe(deviceToken, timeout, new AttributesSubscribeMsg(), httpRequest);
}
- private DeferredResult subscribe(String deviceToken, long timeout, FromDeviceMsg msg) {
+ private DeferredResult subscribe(String deviceToken, long timeout, FromDeviceMsg msg, HttpServletRequest httpRequest) {
DeferredResult responseWriter = new DeferredResult();
+ if (quotaExceeded(httpRequest, responseWriter)) {
+ return responseWriter;
+ }
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
@@ -195,4 +222,13 @@ public class DeviceApiController {
processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg));
}
+ private boolean quotaExceeded(HttpServletRequest request, DeferredResult responseWriter) {
+ if (quotaService.isQuotaExceeded(request.getRemoteAddr())) {
+ log.warn("REST Quota exceeded for [{}] . Disconnect", request.getRemoteAddr());
+ responseWriter.setResult(new ResponseEntity<>(HttpStatus.BANDWIDTH_LIMIT_EXCEEDED));
+ return true;
+ }
+ return false;
+ }
+
}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 2e6abfde96..6fd559bdbf 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -16,7 +16,6 @@
package org.thingsboard.server.transport.mqtt;
import com.fasterxml.jackson.databind.JsonNode;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.*;
@@ -36,18 +35,18 @@ import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.dao.EncryptionUtil;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
-import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
+import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
import org.thingsboard.server.transport.mqtt.util.SslUtil;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.cert.X509Certificate;
import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
@@ -72,13 +71,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private final DeviceService deviceService;
private final DeviceAuthService authService;
private final RelationService relationService;
+ private final QuotaService quotaService;
private final SslHandler sslHandler;
private volatile boolean connected;
private volatile InetSocketAddress address;
private volatile GatewaySessionCtx gatewaySessionCtx;
public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
- MqttTransportAdaptor adaptor, SslHandler sslHandler) {
+ MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) {
this.processor = processor;
this.deviceService = deviceService;
this.relationService = relationService;
@@ -87,6 +87,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor);
this.sessionId = deviceSessionCtx.getSessionId().toUidStr();
this.sslHandler = sslHandler;
+ this.quotaService = quotaService;
}
@Override
@@ -102,35 +103,43 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (msg.fixedHeader() == null) {
log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
processDisconnect(ctx);
- } else {
- deviceSessionCtx.setChannel(ctx);
- switch (msg.fixedHeader().messageType()) {
- case CONNECT:
- processConnect(ctx, (MqttConnectMessage) msg);
- break;
- case PUBLISH:
- processPublish(ctx, (MqttPublishMessage) msg);
- break;
- case SUBSCRIBE:
- processSubscribe(ctx, (MqttSubscribeMessage) msg);
- break;
- case UNSUBSCRIBE:
- processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
- break;
- case PINGREQ:
- if (checkConnected(ctx)) {
- ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
- }
- break;
- case DISCONNECT:
- if (checkConnected(ctx)) {
- processDisconnect(ctx);
- }
- break;
- default:
- break;
- }
+ return;
}
+
+ if (quotaService.isQuotaExceeded(address.getHostName())) {
+ log.warn("MQTT Quota exceeded for [{}:{}] . Disconnect", address.getHostName(), address.getPort());
+ processDisconnect(ctx);
+ return;
+ }
+
+ deviceSessionCtx.setChannel(ctx);
+ switch (msg.fixedHeader().messageType()) {
+ case CONNECT:
+ processConnect(ctx, (MqttConnectMessage) msg);
+ break;
+ case PUBLISH:
+ processPublish(ctx, (MqttPublishMessage) msg);
+ break;
+ case SUBSCRIBE:
+ processSubscribe(ctx, (MqttSubscribeMessage) msg);
+ break;
+ case UNSUBSCRIBE:
+ processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
+ break;
+ case PINGREQ:
+ if (checkConnected(ctx)) {
+ ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
+ }
+ break;
+ case DISCONNECT:
+ if (checkConnected(ctx)) {
+ processDisconnect(ctx);
+ }
+ break;
+ default:
+ break;
+ }
+
}
private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
index 1469290837..93812a80a8 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
@@ -15,27 +15,19 @@
*/
package org.thingsboard.server.transport.mqtt;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
-import io.netty.handler.ssl.util.SelfSignedCertificate;
-import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
-import javax.net.ssl.SSLException;
-import java.security.cert.CertificateException;
-
/**
* @author Andrew Shvayka
*/
@@ -49,16 +41,18 @@ public class MqttTransportServerInitializer extends ChannelInitializer