From 58d9c313a89d3929ad7c3d0e46ed287b91132355 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Mon, 20 Apr 2020 16:12:03 +0300 Subject: [PATCH] Fixed tests --- .../DefaultRuleEngineStatisticsService.java | 11 +- .../controller/BaseAssetControllerTest.java | 128 ++++++++++-------- .../BaseEntityViewControllerTest.java | 6 +- .../controller/ControllerNoSqlTestSuite.java | 7 + .../controller/ControllerSqlTestSuite.java | 7 + .../server/mqtt/MqttNoSqlTestSuite.java | 7 + .../server/mqtt/MqttSqlTestSuite.java | 7 + .../rules/RuleEngineNoSqlTestSuite.java | 7 + .../server/rules/RuleEngineSqlTestSuite.java | 7 + .../server/system/SystemNoSqlTestSuite.java | 7 + .../server/system/SystemSqlTestSuite.java | 7 + .../server/queue/memory/InMemoryStorage.java | 42 +++--- .../queue/memory/InMemoryTbQueueConsumer.java | 11 +- tools/pom.xml | 2 +- 14 files changed, 173 insertions(+), 83 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java b/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java index 3951703351..a506b87ab0 100644 --- a/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java +++ b/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java @@ -47,6 +47,7 @@ import java.util.stream.Collectors; @Slf4j public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsService { + public static final String TB_SERVICE_QUEUE = "TbServiceQueue"; public static final FutureCallback CALLBACK = new FutureCallback() { @Override public void onSuccess(@Nullable Void result) { @@ -95,7 +96,13 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS }); ruleEngineStats.getTenantExceptions().forEach((tenantId, e) -> { TsKvEntry tsKv = new BasicTsKvEntry(ts, new JsonDataEntry("ruleEngineException", e.toJsonString())); - tsService.saveAndNotify(tenantId, getServiceAssetId(tenantId, queueName), Collections.singletonList(tsKv), CALLBACK); + try { + tsService.saveAndNotify(tenantId, getServiceAssetId(tenantId, queueName), Collections.singletonList(tsKv), CALLBACK); + } catch (DataValidationException e2) { + if (!e2.getMessage().equalsIgnoreCase("Asset is referencing to non-existent tenant!")) { + throw e2; + } + } }); ruleEngineStats.reset(); } @@ -113,7 +120,7 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS asset = new Asset(); asset.setTenantId(tenantId); asset.setName(queueName + "_" + serviceInfoProvider.getServiceId()); - asset.setType("TbServiceQueue"); + asset.setType(TB_SERVICE_QUEUE); asset = assetService.saveAsset(asset); } assetId = asset.getId(); diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseAssetControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseAssetControllerTest.java index 56c73b5e13..0422a85416 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseAssetControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseAssetControllerTest.java @@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.page.TextPageData; import org.thingsboard.server.common.data.page.TextPageLink; import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.dao.model.ModelConstants; +import org.thingsboard.server.service.stats.DefaultRuleEngineStatisticsService; import java.util.ArrayList; import java.util.Collections; @@ -71,7 +72,7 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { public void afterTest() throws Exception { loginSysAdmin(); - doDelete("/api/tenant/"+savedTenant.getId().getId().toString()) + doDelete("/api/tenant/" + savedTenant.getId().getId().toString()) .andExpect(status().isOk()); } @@ -111,26 +112,27 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { @Test public void testFindAssetTypesByTenantId() throws Exception { List assets = new ArrayList<>(); - for (int i=0;i<3;i++) { + for (int i = 0; i < 3; i++) { Asset asset = new Asset(); - asset.setName("My asset B"+i); + asset.setName("My asset B" + i); asset.setType("typeB"); assets.add(doPost("/api/asset", asset, Asset.class)); } - for (int i=0;i<7;i++) { + for (int i = 0; i < 7; i++) { Asset asset = new Asset(); - asset.setName("My asset C"+i); + asset.setName("My asset C" + i); asset.setType("typeC"); assets.add(doPost("/api/asset", asset, Asset.class)); } - for (int i=0;i<9;i++) { + for (int i = 0; i < 9; i++) { Asset asset = new Asset(); - asset.setName("My asset A"+i); + asset.setName("My asset A" + i); asset.setType("typeA"); assets.add(doPost("/api/asset", asset, Asset.class)); } List assetTypes = doGetTyped("/api/asset/types", - new TypeReference>(){}); + new TypeReference>() { + }); Assert.assertNotNull(assetTypes); Assert.assertEquals(3, assetTypes.size()); @@ -146,10 +148,10 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { asset.setType("default"); Asset savedAsset = doPost("/api/asset", asset, Asset.class); - doDelete("/api/asset/"+savedAsset.getId().getId().toString()) + doDelete("/api/asset/" + savedAsset.getId().getId().toString()) .andExpect(status().isOk()); - doGet("/api/asset/"+savedAsset.getId().getId().toString()) + doGet("/api/asset/" + savedAsset.getId().getId().toString()) .andExpect(status().isNotFound()); } @@ -244,16 +246,16 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { loginSysAdmin(); - doDelete("/api/tenant/"+savedTenant2.getId().getId().toString()) + doDelete("/api/tenant/" + savedTenant2.getId().getId().toString()) .andExpect(status().isOk()); } @Test public void testFindTenantAssets() throws Exception { List assets = new ArrayList<>(); - for (int i=0;i<178;i++) { + for (int i = 0; i < 178; i++) { Asset asset = new Asset(); - asset.setName("Asset"+i); + asset.setName("Asset" + i); asset.setType("default"); assets.add(doPost("/api/asset", asset, Asset.class)); } @@ -262,13 +264,16 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { TextPageData pageData = null; do { pageData = doGetTypedWithPageLink("/api/tenant/assets?", - new TypeReference>(){}, pageLink); + new TypeReference>() { + }, pageLink); loadedAssets.addAll(pageData.getData()); if (pageData.hasNext()) { pageLink = pageData.getNextPageLink(); } } while (pageData.hasNext()); + loadedAssets.removeIf(asset -> asset.getType().equals(DefaultRuleEngineStatisticsService.TB_SERVICE_QUEUE)); + Collections.sort(assets, idComparator); Collections.sort(loadedAssets, idComparator); @@ -279,10 +284,10 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { public void testFindTenantAssetsByName() throws Exception { String title1 = "Asset title 1"; List assetsTitle1 = new ArrayList<>(); - for (int i=0;i<143;i++) { + for (int i = 0; i < 143; i++) { Asset asset = new Asset(); String suffix = RandomStringUtils.randomAlphanumeric(15); - String name = title1+suffix; + String name = title1 + suffix; name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase(); asset.setName(name); asset.setType("default"); @@ -290,10 +295,10 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { } String title2 = "Asset title 2"; List assetsTitle2 = new ArrayList<>(); - for (int i=0;i<75;i++) { + for (int i = 0; i < 75; i++) { Asset asset = new Asset(); String suffix = RandomStringUtils.randomAlphanumeric(15); - String name = title2+suffix; + String name = title2 + suffix; name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase(); asset.setName(name); asset.setType("default"); @@ -305,7 +310,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { TextPageData pageData = null; do { pageData = doGetTypedWithPageLink("/api/tenant/assets?", - new TypeReference>(){}, pageLink); + new TypeReference>() { + }, pageLink); loadedAssetsTitle1.addAll(pageData.getData()); if (pageData.hasNext()) { pageLink = pageData.getNextPageLink(); @@ -321,7 +327,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { pageLink = new TextPageLink(4, title2); do { pageData = doGetTypedWithPageLink("/api/tenant/assets?", - new TypeReference>(){}, pageLink); + new TypeReference>() { + }, pageLink); loadedAssetsTitle2.addAll(pageData.getData()); if (pageData.hasNext()) { pageLink = pageData.getNextPageLink(); @@ -334,24 +341,26 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { Assert.assertEquals(assetsTitle2, loadedAssetsTitle2); for (Asset asset : loadedAssetsTitle1) { - doDelete("/api/asset/"+asset.getId().getId().toString()) + doDelete("/api/asset/" + asset.getId().getId().toString()) .andExpect(status().isOk()); } pageLink = new TextPageLink(4, title1); pageData = doGetTypedWithPageLink("/api/tenant/assets?", - new TypeReference>(){}, pageLink); + new TypeReference>() { + }, pageLink); Assert.assertFalse(pageData.hasNext()); Assert.assertEquals(0, pageData.getData().size()); for (Asset asset : loadedAssetsTitle2) { - doDelete("/api/asset/"+asset.getId().getId().toString()) + doDelete("/api/asset/" + asset.getId().getId().toString()) .andExpect(status().isOk()); } pageLink = new TextPageLink(4, title2); pageData = doGetTypedWithPageLink("/api/tenant/assets?", - new TypeReference>(){}, pageLink); + new TypeReference>() { + }, pageLink); Assert.assertFalse(pageData.hasNext()); Assert.assertEquals(0, pageData.getData().size()); } @@ -361,10 +370,10 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { String title1 = "Asset title 1"; String type1 = "typeA"; List assetsType1 = new ArrayList<>(); - for (int i=0;i<143;i++) { + for (int i = 0; i < 143; i++) { Asset asset = new Asset(); String suffix = RandomStringUtils.randomAlphanumeric(15); - String name = title1+suffix; + String name = title1 + suffix; name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase(); asset.setName(name); asset.setType(type1); @@ -373,10 +382,10 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { String title2 = "Asset title 2"; String type2 = "typeB"; List assetsType2 = new ArrayList<>(); - for (int i=0;i<75;i++) { + for (int i = 0; i < 75; i++) { Asset asset = new Asset(); String suffix = RandomStringUtils.randomAlphanumeric(15); - String name = title2+suffix; + String name = title2 + suffix; name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase(); asset.setName(name); asset.setType(type2); @@ -388,7 +397,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { TextPageData pageData = null; do { pageData = doGetTypedWithPageLink("/api/tenant/assets?type={type}&", - new TypeReference>(){}, pageLink, type1); + new TypeReference>() { + }, pageLink, type1); loadedAssetsType1.addAll(pageData.getData()); if (pageData.hasNext()) { pageLink = pageData.getNextPageLink(); @@ -404,7 +414,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { pageLink = new TextPageLink(4); do { pageData = doGetTypedWithPageLink("/api/tenant/assets?type={type}&", - new TypeReference>(){}, pageLink, type2); + new TypeReference>() { + }, pageLink, type2); loadedAssetsType2.addAll(pageData.getData()); if (pageData.hasNext()) { pageLink = pageData.getNextPageLink(); @@ -417,24 +428,26 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { Assert.assertEquals(assetsType2, loadedAssetsType2); for (Asset asset : loadedAssetsType1) { - doDelete("/api/asset/"+asset.getId().getId().toString()) + doDelete("/api/asset/" + asset.getId().getId().toString()) .andExpect(status().isOk()); } pageLink = new TextPageLink(4); pageData = doGetTypedWithPageLink("/api/tenant/assets?type={type}&", - new TypeReference>(){}, pageLink, type1); + new TypeReference>() { + }, pageLink, type1); Assert.assertFalse(pageData.hasNext()); Assert.assertEquals(0, pageData.getData().size()); for (Asset asset : loadedAssetsType2) { - doDelete("/api/asset/"+asset.getId().getId().toString()) + doDelete("/api/asset/" + asset.getId().getId().toString()) .andExpect(status().isOk()); } pageLink = new TextPageLink(4); pageData = doGetTypedWithPageLink("/api/tenant/assets?type={type}&", - new TypeReference>(){}, pageLink, type2); + new TypeReference>() { + }, pageLink, type2); Assert.assertFalse(pageData.hasNext()); Assert.assertEquals(0, pageData.getData().size()); } @@ -447,9 +460,9 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { CustomerId customerId = customer.getId(); List assets = new ArrayList<>(); - for (int i=0;i<128;i++) { + for (int i = 0; i < 128; i++) { Asset asset = new Asset(); - asset.setName("Asset"+i); + asset.setName("Asset" + i); asset.setType("default"); asset = doPost("/api/asset", asset, Asset.class); assets.add(doPost("/api/customer/" + customerId.getId().toString() @@ -461,7 +474,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { TextPageData pageData = null; do { pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?", - new TypeReference>(){}, pageLink); + new TypeReference>() { + }, pageLink); loadedAssets.addAll(pageData.getData()); if (pageData.hasNext()) { pageLink = pageData.getNextPageLink(); @@ -483,10 +497,10 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { String title1 = "Asset title 1"; List assetsTitle1 = new ArrayList<>(); - for (int i=0;i<125;i++) { + for (int i = 0; i < 125; i++) { Asset asset = new Asset(); String suffix = RandomStringUtils.randomAlphanumeric(15); - String name = title1+suffix; + String name = title1 + suffix; name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase(); asset.setName(name); asset.setType("default"); @@ -496,10 +510,10 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { } String title2 = "Asset title 2"; List assetsTitle2 = new ArrayList<>(); - for (int i=0;i<143;i++) { + for (int i = 0; i < 143; i++) { Asset asset = new Asset(); String suffix = RandomStringUtils.randomAlphanumeric(15); - String name = title2+suffix; + String name = title2 + suffix; name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase(); asset.setName(name); asset.setType("default"); @@ -513,7 +527,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { TextPageData pageData = null; do { pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?", - new TypeReference>(){}, pageLink); + new TypeReference>() { + }, pageLink); loadedAssetsTitle1.addAll(pageData.getData()); if (pageData.hasNext()) { pageLink = pageData.getNextPageLink(); @@ -529,7 +544,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { pageLink = new TextPageLink(4, title2); do { pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?", - new TypeReference>(){}, pageLink); + new TypeReference>() { + }, pageLink); loadedAssetsTitle2.addAll(pageData.getData()); if (pageData.hasNext()) { pageLink = pageData.getNextPageLink(); @@ -548,7 +564,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { pageLink = new TextPageLink(4, title1); pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?", - new TypeReference>(){}, pageLink); + new TypeReference>() { + }, pageLink); Assert.assertFalse(pageData.hasNext()); Assert.assertEquals(0, pageData.getData().size()); @@ -559,7 +576,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { pageLink = new TextPageLink(4, title2); pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?", - new TypeReference>(){}, pageLink); + new TypeReference>() { + }, pageLink); Assert.assertFalse(pageData.hasNext()); Assert.assertEquals(0, pageData.getData().size()); } @@ -574,10 +592,10 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { String title1 = "Asset title 1"; String type1 = "typeC"; List assetsType1 = new ArrayList<>(); - for (int i=0;i<125;i++) { + for (int i = 0; i < 125; i++) { Asset asset = new Asset(); String suffix = RandomStringUtils.randomAlphanumeric(15); - String name = title1+suffix; + String name = title1 + suffix; name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase(); asset.setName(name); asset.setType(type1); @@ -588,10 +606,10 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { String title2 = "Asset title 2"; String type2 = "typeD"; List assetsType2 = new ArrayList<>(); - for (int i=0;i<143;i++) { + for (int i = 0; i < 143; i++) { Asset asset = new Asset(); String suffix = RandomStringUtils.randomAlphanumeric(15); - String name = title2+suffix; + String name = title2 + suffix; name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase(); asset.setName(name); asset.setType(type2); @@ -605,7 +623,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { TextPageData pageData = null; do { pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?type={type}&", - new TypeReference>(){}, pageLink, type1); + new TypeReference>() { + }, pageLink, type1); loadedAssetsType1.addAll(pageData.getData()); if (pageData.hasNext()) { pageLink = pageData.getNextPageLink(); @@ -621,7 +640,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { pageLink = new TextPageLink(4); do { pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?type={type}&", - new TypeReference>(){}, pageLink, type2); + new TypeReference>() { + }, pageLink, type2); loadedAssetsType2.addAll(pageData.getData()); if (pageData.hasNext()) { pageLink = pageData.getNextPageLink(); @@ -640,7 +660,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { pageLink = new TextPageLink(4); pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?type={type}&", - new TypeReference>(){}, pageLink, type1); + new TypeReference>() { + }, pageLink, type1); Assert.assertFalse(pageData.hasNext()); Assert.assertEquals(0, pageData.getData().size()); @@ -651,7 +672,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest { pageLink = new TextPageLink(4); pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?type={type}&", - new TypeReference>(){}, pageLink, type2); + new TypeReference>() { + }, pageLink, type2); Assert.assertFalse(pageData.hasNext()); Assert.assertEquals(0, pageData.getData().size()); } diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseEntityViewControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseEntityViewControllerTest.java index e60f549610..420446e04d 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseEntityViewControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseEntityViewControllerTest.java @@ -426,7 +426,7 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes message.setPayload(strKvs.getBytes()); client.publish("v1/devices/me/telemetry", message); Thread.sleep(1000); -// client.disconnect(); + client.disconnect(); } private void awaitConnected(MqttAsyncClient client, long ms) throws InterruptedException { @@ -463,13 +463,13 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(accessToken); client.connect(options); - Thread.sleep(3000); + awaitConnected(client, TimeUnit.SECONDS.toMillis(30)); MqttMessage message = new MqttMessage(); message.setPayload((stringKV).getBytes()); client.publish("v1/devices/me/attributes", message); Thread.sleep(1000); - + client.disconnect(); return new HashSet<>(doGetAsync("/api/plugins/telemetry/DEVICE/" + viewDeviceId + "/keys/attributes", List.class)); } diff --git a/application/src/test/java/org/thingsboard/server/controller/ControllerNoSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/controller/ControllerNoSqlTestSuite.java index 4ad41824a0..781c483fc5 100644 --- a/application/src/test/java/org/thingsboard/server/controller/ControllerNoSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/controller/ControllerNoSqlTestSuite.java @@ -16,10 +16,12 @@ package org.thingsboard.server.controller; import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.extensions.cpsuite.ClasspathSuite; import org.junit.runner.RunWith; import org.thingsboard.server.dao.CustomCassandraCQLUnit; +import org.thingsboard.server.queue.memory.InMemoryStorage; import java.util.Arrays; @@ -37,4 +39,9 @@ public class ControllerNoSqlTestSuite { new ClassPathCQLDataSet("cassandra/system-data.cql", false, false), new ClassPathCQLDataSet("cassandra/system-test.cql", false, false)), "cassandra-test.yaml", 30000l); + + @BeforeClass + public static void cleanupInMemStorage(){ + InMemoryStorage.getInstance().cleanup(); + } } diff --git a/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java index 8dc0acff57..347eaee7eb 100644 --- a/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java @@ -15,10 +15,12 @@ */ package org.thingsboard.server.controller; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.extensions.cpsuite.ClasspathSuite; import org.junit.runner.RunWith; import org.thingsboard.server.dao.CustomSqlUnit; +import org.thingsboard.server.queue.memory.InMemoryStorage; import java.util.Arrays; @@ -33,4 +35,9 @@ public class ControllerSqlTestSuite { Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/schema-entities-idx.sql", "sql/system-data.sql"), "sql/drop-all-tables.sql", "sql-test.properties"); + + @BeforeClass + public static void cleanupInMemStorage(){ + InMemoryStorage.getInstance().cleanup(); + } } diff --git a/application/src/test/java/org/thingsboard/server/mqtt/MqttNoSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/mqtt/MqttNoSqlTestSuite.java index 9c4030cff4..7360c5c506 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/MqttNoSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/MqttNoSqlTestSuite.java @@ -16,10 +16,12 @@ package org.thingsboard.server.mqtt; import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.extensions.cpsuite.ClasspathSuite; import org.junit.runner.RunWith; import org.thingsboard.server.dao.CustomCassandraCQLUnit; +import org.thingsboard.server.queue.memory.InMemoryStorage; import java.util.Arrays; @@ -36,4 +38,9 @@ public class MqttNoSqlTestSuite { new ClassPathCQLDataSet("cassandra/schema-entities.cql", false, false), new ClassPathCQLDataSet("cassandra/system-data.cql", false, false)), "cassandra-test.yaml", 30000l); + + @BeforeClass + public static void cleanupInMemStorage(){ + InMemoryStorage.getInstance().cleanup(); + } } diff --git a/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java index 2863589ba1..70de3b27ca 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java @@ -15,10 +15,12 @@ */ package org.thingsboard.server.mqtt; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.extensions.cpsuite.ClasspathSuite; import org.junit.runner.RunWith; import org.thingsboard.server.dao.CustomSqlUnit; +import org.thingsboard.server.queue.memory.InMemoryStorage; import java.util.Arrays; @@ -32,4 +34,9 @@ public class MqttSqlTestSuite { Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"), "sql/drop-all-tables.sql", "sql-test.properties"); + + @BeforeClass + public static void cleanupInMemStorage(){ + InMemoryStorage.getInstance().cleanup(); + } } diff --git a/application/src/test/java/org/thingsboard/server/rules/RuleEngineNoSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/rules/RuleEngineNoSqlTestSuite.java index e44c383452..fbab13147c 100644 --- a/application/src/test/java/org/thingsboard/server/rules/RuleEngineNoSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/rules/RuleEngineNoSqlTestSuite.java @@ -16,11 +16,13 @@ package org.thingsboard.server.rules; import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.extensions.cpsuite.ClasspathSuite; import org.junit.runner.RunWith; import org.thingsboard.server.dao.CustomCassandraCQLUnit; import org.thingsboard.server.dao.CustomSqlUnit; +import org.thingsboard.server.queue.memory.InMemoryStorage; import java.util.Arrays; @@ -40,4 +42,9 @@ public class RuleEngineNoSqlTestSuite { new ClassPathCQLDataSet("cassandra/system-data.cql", false, false)), "cassandra-test.yaml", 30000l); + @BeforeClass + public static void cleanupInMemStorage(){ + InMemoryStorage.getInstance().cleanup(); + } + } diff --git a/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java index 5f930821f7..83ac5f7703 100644 --- a/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java @@ -15,10 +15,12 @@ */ package org.thingsboard.server.rules; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.extensions.cpsuite.ClasspathSuite; import org.junit.runner.RunWith; import org.thingsboard.server.dao.CustomSqlUnit; +import org.thingsboard.server.queue.memory.InMemoryStorage; import java.util.Arrays; @@ -33,4 +35,9 @@ public class RuleEngineSqlTestSuite { Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"), "sql/drop-all-tables.sql", "sql-test.properties"); + + @BeforeClass + public static void cleanupInMemStorage(){ + InMemoryStorage.getInstance().cleanup(); + } } diff --git a/application/src/test/java/org/thingsboard/server/system/SystemNoSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/system/SystemNoSqlTestSuite.java index 41be7641c6..c4182db3ee 100644 --- a/application/src/test/java/org/thingsboard/server/system/SystemNoSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/system/SystemNoSqlTestSuite.java @@ -16,10 +16,12 @@ package org.thingsboard.server.system; import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.extensions.cpsuite.ClasspathSuite; import org.junit.runner.RunWith; import org.thingsboard.server.dao.CustomCassandraCQLUnit; +import org.thingsboard.server.queue.memory.InMemoryStorage; import java.util.Arrays; @@ -38,4 +40,9 @@ public class SystemNoSqlTestSuite { new ClassPathCQLDataSet("cassandra/schema-entities.cql", false, false), new ClassPathCQLDataSet("cassandra/system-data.cql", false, false)), "cassandra-test.yaml", 30000l); + + @BeforeClass + public static void cleanupInMemStorage(){ + InMemoryStorage.getInstance().cleanup(); + } } diff --git a/application/src/test/java/org/thingsboard/server/system/SystemSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/system/SystemSqlTestSuite.java index b12d513ce0..6060a2cc2b 100644 --- a/application/src/test/java/org/thingsboard/server/system/SystemSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/system/SystemSqlTestSuite.java @@ -15,10 +15,12 @@ */ package org.thingsboard.server.system; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.extensions.cpsuite.ClasspathSuite; import org.junit.runner.RunWith; import org.thingsboard.server.dao.CustomSqlUnit; +import org.thingsboard.server.queue.memory.InMemoryStorage; import java.util.Arrays; @@ -35,4 +37,9 @@ public class SystemSqlTestSuite { "sql/drop-all-tables.sql", "sql-test.properties"); + @BeforeClass + public static void cleanupInMemStorage(){ + InMemoryStorage.getInstance().cleanup(); + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java index f919df871e..e119d1433b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryStorage.java @@ -30,11 +30,9 @@ import java.util.concurrent.TimeUnit; public final class InMemoryStorage { private static InMemoryStorage instance; private final ConcurrentHashMap> storage; - private volatile boolean stopped; private InMemoryStorage() { storage = new ConcurrentHashMap<>(); - stopped = false; } public static InMemoryStorage getInstance() { @@ -52,33 +50,31 @@ public final class InMemoryStorage { return storage.computeIfAbsent(topic, (t) -> new LinkedBlockingQueue<>()).add(msg); } - public List get(String topic, long durationInMillis) { + public List get(String topic, long durationInMillis) throws InterruptedException { if (storage.containsKey(topic)) { - try { - List entities; - T first = (T) storage.get(topic).poll(durationInMillis, TimeUnit.MILLISECONDS); - if (first != null) { - entities = new ArrayList<>(); - entities.add(first); - List otherList = new ArrayList<>(); - storage.get(topic).drainTo(otherList, 999); - for (TbQueueMsg other : otherList) { - entities.add((T) other); - } - } else { - entities = Collections.emptyList(); - } - return entities; - } catch (InterruptedException e) { - if (!stopped) { - log.warn("Queue was interrupted", e); + List entities; + T first = (T) storage.get(topic).poll(durationInMillis, TimeUnit.MILLISECONDS); + if (first != null) { + entities = new ArrayList<>(); + entities.add(first); + List otherList = new ArrayList<>(); + storage.get(topic).drainTo(otherList, 999); + for (TbQueueMsg other : otherList) { + entities.add((T) other); } + } else { + entities = Collections.emptyList(); } + return entities; } return Collections.emptyList(); } - public void stop() { - stopped = true; + /** + * Used primarily for testing. + */ + public void cleanup() { + storage.clear(); } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java index 2b81839540..3920f143f1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java @@ -66,7 +66,16 @@ public class InMemoryTbQueueConsumer implements TbQueueCon if (subscribed) { List messages = partitions .stream() - .map(tpi -> storage.get(tpi.getFullTopicName(), durationInMillis)) + .map(tpi -> { + try { + return storage.get(tpi.getFullTopicName(), durationInMillis); + } catch (InterruptedException e) { + if (!stopped) { + log.error("Queue was interrupted.", e); + } + return Collections.emptyList(); + } + }) .flatMap(List::stream) .map(msg -> (T) msg).collect(Collectors.toList()); if (messages.size() > 0) { diff --git a/tools/pom.xml b/tools/pom.xml index 64027c478e..1960d330d0 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -54,7 +54,7 @@ org.apache.cassandra cassandra-all - 3.11.4 + 3.11.6 com.datastax.cassandra