Fixed tests

This commit is contained in:
Andrii Shvaika 2020-04-20 16:12:03 +03:00
parent 341f0d14ab
commit 58d9c313a8
14 changed files with 173 additions and 83 deletions

View File

@ -47,6 +47,7 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsService { public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsService {
public static final String TB_SERVICE_QUEUE = "TbServiceQueue";
public static final FutureCallback<Void> CALLBACK = new FutureCallback<Void>() { public static final FutureCallback<Void> CALLBACK = new FutureCallback<Void>() {
@Override @Override
public void onSuccess(@Nullable Void result) { public void onSuccess(@Nullable Void result) {
@ -95,7 +96,13 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS
}); });
ruleEngineStats.getTenantExceptions().forEach((tenantId, e) -> { ruleEngineStats.getTenantExceptions().forEach((tenantId, e) -> {
TsKvEntry tsKv = new BasicTsKvEntry(ts, new JsonDataEntry("ruleEngineException", e.toJsonString())); TsKvEntry tsKv = new BasicTsKvEntry(ts, new JsonDataEntry("ruleEngineException", e.toJsonString()));
try {
tsService.saveAndNotify(tenantId, getServiceAssetId(tenantId, queueName), Collections.singletonList(tsKv), CALLBACK); tsService.saveAndNotify(tenantId, getServiceAssetId(tenantId, queueName), Collections.singletonList(tsKv), CALLBACK);
} catch (DataValidationException e2) {
if (!e2.getMessage().equalsIgnoreCase("Asset is referencing to non-existent tenant!")) {
throw e2;
}
}
}); });
ruleEngineStats.reset(); ruleEngineStats.reset();
} }
@ -113,7 +120,7 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS
asset = new Asset(); asset = new Asset();
asset.setTenantId(tenantId); asset.setTenantId(tenantId);
asset.setName(queueName + "_" + serviceInfoProvider.getServiceId()); asset.setName(queueName + "_" + serviceInfoProvider.getServiceId());
asset.setType("TbServiceQueue"); asset.setType(TB_SERVICE_QUEUE);
asset = assetService.saveAsset(asset); asset = assetService.saveAsset(asset);
} }
assetId = asset.getId(); assetId = asset.getId();

View File

@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink; import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.service.stats.DefaultRuleEngineStatisticsService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -130,7 +131,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
assets.add(doPost("/api/asset", asset, Asset.class)); assets.add(doPost("/api/asset", asset, Asset.class));
} }
List<EntitySubtype> assetTypes = doGetTyped("/api/asset/types", List<EntitySubtype> assetTypes = doGetTyped("/api/asset/types",
new TypeReference<List<EntitySubtype>>(){}); new TypeReference<List<EntitySubtype>>() {
});
Assert.assertNotNull(assetTypes); Assert.assertNotNull(assetTypes);
Assert.assertEquals(3, assetTypes.size()); Assert.assertEquals(3, assetTypes.size());
@ -262,13 +264,16 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
TextPageData<Asset> pageData = null; TextPageData<Asset> pageData = null;
do { do {
pageData = doGetTypedWithPageLink("/api/tenant/assets?", pageData = doGetTypedWithPageLink("/api/tenant/assets?",
new TypeReference<TextPageData<Asset>>(){}, pageLink); new TypeReference<TextPageData<Asset>>() {
}, pageLink);
loadedAssets.addAll(pageData.getData()); loadedAssets.addAll(pageData.getData());
if (pageData.hasNext()) { if (pageData.hasNext()) {
pageLink = pageData.getNextPageLink(); pageLink = pageData.getNextPageLink();
} }
} while (pageData.hasNext()); } while (pageData.hasNext());
loadedAssets.removeIf(asset -> asset.getType().equals(DefaultRuleEngineStatisticsService.TB_SERVICE_QUEUE));
Collections.sort(assets, idComparator); Collections.sort(assets, idComparator);
Collections.sort(loadedAssets, idComparator); Collections.sort(loadedAssets, idComparator);
@ -305,7 +310,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
TextPageData<Asset> pageData = null; TextPageData<Asset> pageData = null;
do { do {
pageData = doGetTypedWithPageLink("/api/tenant/assets?", pageData = doGetTypedWithPageLink("/api/tenant/assets?",
new TypeReference<TextPageData<Asset>>(){}, pageLink); new TypeReference<TextPageData<Asset>>() {
}, pageLink);
loadedAssetsTitle1.addAll(pageData.getData()); loadedAssetsTitle1.addAll(pageData.getData());
if (pageData.hasNext()) { if (pageData.hasNext()) {
pageLink = pageData.getNextPageLink(); pageLink = pageData.getNextPageLink();
@ -321,7 +327,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
pageLink = new TextPageLink(4, title2); pageLink = new TextPageLink(4, title2);
do { do {
pageData = doGetTypedWithPageLink("/api/tenant/assets?", pageData = doGetTypedWithPageLink("/api/tenant/assets?",
new TypeReference<TextPageData<Asset>>(){}, pageLink); new TypeReference<TextPageData<Asset>>() {
}, pageLink);
loadedAssetsTitle2.addAll(pageData.getData()); loadedAssetsTitle2.addAll(pageData.getData());
if (pageData.hasNext()) { if (pageData.hasNext()) {
pageLink = pageData.getNextPageLink(); pageLink = pageData.getNextPageLink();
@ -340,7 +347,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
pageLink = new TextPageLink(4, title1); pageLink = new TextPageLink(4, title1);
pageData = doGetTypedWithPageLink("/api/tenant/assets?", pageData = doGetTypedWithPageLink("/api/tenant/assets?",
new TypeReference<TextPageData<Asset>>(){}, pageLink); new TypeReference<TextPageData<Asset>>() {
}, pageLink);
Assert.assertFalse(pageData.hasNext()); Assert.assertFalse(pageData.hasNext());
Assert.assertEquals(0, pageData.getData().size()); Assert.assertEquals(0, pageData.getData().size());
@ -351,7 +359,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
pageLink = new TextPageLink(4, title2); pageLink = new TextPageLink(4, title2);
pageData = doGetTypedWithPageLink("/api/tenant/assets?", pageData = doGetTypedWithPageLink("/api/tenant/assets?",
new TypeReference<TextPageData<Asset>>(){}, pageLink); new TypeReference<TextPageData<Asset>>() {
}, pageLink);
Assert.assertFalse(pageData.hasNext()); Assert.assertFalse(pageData.hasNext());
Assert.assertEquals(0, pageData.getData().size()); Assert.assertEquals(0, pageData.getData().size());
} }
@ -388,7 +397,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
TextPageData<Asset> pageData = null; TextPageData<Asset> pageData = null;
do { do {
pageData = doGetTypedWithPageLink("/api/tenant/assets?type={type}&", pageData = doGetTypedWithPageLink("/api/tenant/assets?type={type}&",
new TypeReference<TextPageData<Asset>>(){}, pageLink, type1); new TypeReference<TextPageData<Asset>>() {
}, pageLink, type1);
loadedAssetsType1.addAll(pageData.getData()); loadedAssetsType1.addAll(pageData.getData());
if (pageData.hasNext()) { if (pageData.hasNext()) {
pageLink = pageData.getNextPageLink(); pageLink = pageData.getNextPageLink();
@ -404,7 +414,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
pageLink = new TextPageLink(4); pageLink = new TextPageLink(4);
do { do {
pageData = doGetTypedWithPageLink("/api/tenant/assets?type={type}&", pageData = doGetTypedWithPageLink("/api/tenant/assets?type={type}&",
new TypeReference<TextPageData<Asset>>(){}, pageLink, type2); new TypeReference<TextPageData<Asset>>() {
}, pageLink, type2);
loadedAssetsType2.addAll(pageData.getData()); loadedAssetsType2.addAll(pageData.getData());
if (pageData.hasNext()) { if (pageData.hasNext()) {
pageLink = pageData.getNextPageLink(); pageLink = pageData.getNextPageLink();
@ -423,7 +434,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
pageLink = new TextPageLink(4); pageLink = new TextPageLink(4);
pageData = doGetTypedWithPageLink("/api/tenant/assets?type={type}&", pageData = doGetTypedWithPageLink("/api/tenant/assets?type={type}&",
new TypeReference<TextPageData<Asset>>(){}, pageLink, type1); new TypeReference<TextPageData<Asset>>() {
}, pageLink, type1);
Assert.assertFalse(pageData.hasNext()); Assert.assertFalse(pageData.hasNext());
Assert.assertEquals(0, pageData.getData().size()); Assert.assertEquals(0, pageData.getData().size());
@ -434,7 +446,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
pageLink = new TextPageLink(4); pageLink = new TextPageLink(4);
pageData = doGetTypedWithPageLink("/api/tenant/assets?type={type}&", pageData = doGetTypedWithPageLink("/api/tenant/assets?type={type}&",
new TypeReference<TextPageData<Asset>>(){}, pageLink, type2); new TypeReference<TextPageData<Asset>>() {
}, pageLink, type2);
Assert.assertFalse(pageData.hasNext()); Assert.assertFalse(pageData.hasNext());
Assert.assertEquals(0, pageData.getData().size()); Assert.assertEquals(0, pageData.getData().size());
} }
@ -461,7 +474,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
TextPageData<Asset> pageData = null; TextPageData<Asset> pageData = null;
do { do {
pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?", pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?",
new TypeReference<TextPageData<Asset>>(){}, pageLink); new TypeReference<TextPageData<Asset>>() {
}, pageLink);
loadedAssets.addAll(pageData.getData()); loadedAssets.addAll(pageData.getData());
if (pageData.hasNext()) { if (pageData.hasNext()) {
pageLink = pageData.getNextPageLink(); pageLink = pageData.getNextPageLink();
@ -513,7 +527,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
TextPageData<Asset> pageData = null; TextPageData<Asset> pageData = null;
do { do {
pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?", pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?",
new TypeReference<TextPageData<Asset>>(){}, pageLink); new TypeReference<TextPageData<Asset>>() {
}, pageLink);
loadedAssetsTitle1.addAll(pageData.getData()); loadedAssetsTitle1.addAll(pageData.getData());
if (pageData.hasNext()) { if (pageData.hasNext()) {
pageLink = pageData.getNextPageLink(); pageLink = pageData.getNextPageLink();
@ -529,7 +544,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
pageLink = new TextPageLink(4, title2); pageLink = new TextPageLink(4, title2);
do { do {
pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?", pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?",
new TypeReference<TextPageData<Asset>>(){}, pageLink); new TypeReference<TextPageData<Asset>>() {
}, pageLink);
loadedAssetsTitle2.addAll(pageData.getData()); loadedAssetsTitle2.addAll(pageData.getData());
if (pageData.hasNext()) { if (pageData.hasNext()) {
pageLink = pageData.getNextPageLink(); pageLink = pageData.getNextPageLink();
@ -548,7 +564,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
pageLink = new TextPageLink(4, title1); pageLink = new TextPageLink(4, title1);
pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?", pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?",
new TypeReference<TextPageData<Asset>>(){}, pageLink); new TypeReference<TextPageData<Asset>>() {
}, pageLink);
Assert.assertFalse(pageData.hasNext()); Assert.assertFalse(pageData.hasNext());
Assert.assertEquals(0, pageData.getData().size()); Assert.assertEquals(0, pageData.getData().size());
@ -559,7 +576,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
pageLink = new TextPageLink(4, title2); pageLink = new TextPageLink(4, title2);
pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?", pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?",
new TypeReference<TextPageData<Asset>>(){}, pageLink); new TypeReference<TextPageData<Asset>>() {
}, pageLink);
Assert.assertFalse(pageData.hasNext()); Assert.assertFalse(pageData.hasNext());
Assert.assertEquals(0, pageData.getData().size()); Assert.assertEquals(0, pageData.getData().size());
} }
@ -605,7 +623,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
TextPageData<Asset> pageData = null; TextPageData<Asset> pageData = null;
do { do {
pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?type={type}&", pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?type={type}&",
new TypeReference<TextPageData<Asset>>(){}, pageLink, type1); new TypeReference<TextPageData<Asset>>() {
}, pageLink, type1);
loadedAssetsType1.addAll(pageData.getData()); loadedAssetsType1.addAll(pageData.getData());
if (pageData.hasNext()) { if (pageData.hasNext()) {
pageLink = pageData.getNextPageLink(); pageLink = pageData.getNextPageLink();
@ -621,7 +640,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
pageLink = new TextPageLink(4); pageLink = new TextPageLink(4);
do { do {
pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?type={type}&", pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?type={type}&",
new TypeReference<TextPageData<Asset>>(){}, pageLink, type2); new TypeReference<TextPageData<Asset>>() {
}, pageLink, type2);
loadedAssetsType2.addAll(pageData.getData()); loadedAssetsType2.addAll(pageData.getData());
if (pageData.hasNext()) { if (pageData.hasNext()) {
pageLink = pageData.getNextPageLink(); pageLink = pageData.getNextPageLink();
@ -640,7 +660,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
pageLink = new TextPageLink(4); pageLink = new TextPageLink(4);
pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?type={type}&", pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?type={type}&",
new TypeReference<TextPageData<Asset>>(){}, pageLink, type1); new TypeReference<TextPageData<Asset>>() {
}, pageLink, type1);
Assert.assertFalse(pageData.hasNext()); Assert.assertFalse(pageData.hasNext());
Assert.assertEquals(0, pageData.getData().size()); Assert.assertEquals(0, pageData.getData().size());
@ -651,7 +672,8 @@ public abstract class BaseAssetControllerTest extends AbstractControllerTest {
pageLink = new TextPageLink(4); pageLink = new TextPageLink(4);
pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?type={type}&", pageData = doGetTypedWithPageLink("/api/customer/" + customerId.getId().toString() + "/assets?type={type}&",
new TypeReference<TextPageData<Asset>>(){}, pageLink, type2); new TypeReference<TextPageData<Asset>>() {
}, pageLink, type2);
Assert.assertFalse(pageData.hasNext()); Assert.assertFalse(pageData.hasNext());
Assert.assertEquals(0, pageData.getData().size()); Assert.assertEquals(0, pageData.getData().size());
} }

View File

@ -426,7 +426,7 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes
message.setPayload(strKvs.getBytes()); message.setPayload(strKvs.getBytes());
client.publish("v1/devices/me/telemetry", message); client.publish("v1/devices/me/telemetry", message);
Thread.sleep(1000); Thread.sleep(1000);
// client.disconnect(); client.disconnect();
} }
private void awaitConnected(MqttAsyncClient client, long ms) throws InterruptedException { private void awaitConnected(MqttAsyncClient client, long ms) throws InterruptedException {
@ -463,13 +463,13 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes
MqttConnectOptions options = new MqttConnectOptions(); MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(accessToken); options.setUserName(accessToken);
client.connect(options); client.connect(options);
Thread.sleep(3000); awaitConnected(client, TimeUnit.SECONDS.toMillis(30));
MqttMessage message = new MqttMessage(); MqttMessage message = new MqttMessage();
message.setPayload((stringKV).getBytes()); message.setPayload((stringKV).getBytes());
client.publish("v1/devices/me/attributes", message); client.publish("v1/devices/me/attributes", message);
Thread.sleep(1000); Thread.sleep(1000);
client.disconnect();
return new HashSet<>(doGetAsync("/api/plugins/telemetry/DEVICE/" + viewDeviceId + "/keys/attributes", List.class)); return new HashSet<>(doGetAsync("/api/plugins/telemetry/DEVICE/" + viewDeviceId + "/keys/attributes", List.class));
} }

View File

@ -16,10 +16,12 @@
package org.thingsboard.server.controller; package org.thingsboard.server.controller;
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.extensions.cpsuite.ClasspathSuite; import org.junit.extensions.cpsuite.ClasspathSuite;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.thingsboard.server.dao.CustomCassandraCQLUnit; import org.thingsboard.server.dao.CustomCassandraCQLUnit;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import java.util.Arrays; import java.util.Arrays;
@ -37,4 +39,9 @@ public class ControllerNoSqlTestSuite {
new ClassPathCQLDataSet("cassandra/system-data.cql", false, false), new ClassPathCQLDataSet("cassandra/system-data.cql", false, false),
new ClassPathCQLDataSet("cassandra/system-test.cql", false, false)), new ClassPathCQLDataSet("cassandra/system-test.cql", false, false)),
"cassandra-test.yaml", 30000l); "cassandra-test.yaml", 30000l);
@BeforeClass
public static void cleanupInMemStorage(){
InMemoryStorage.getInstance().cleanup();
}
} }

View File

@ -15,10 +15,12 @@
*/ */
package org.thingsboard.server.controller; package org.thingsboard.server.controller;
import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.extensions.cpsuite.ClasspathSuite; import org.junit.extensions.cpsuite.ClasspathSuite;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.thingsboard.server.dao.CustomSqlUnit; import org.thingsboard.server.dao.CustomSqlUnit;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import java.util.Arrays; import java.util.Arrays;
@ -33,4 +35,9 @@ public class ControllerSqlTestSuite {
Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/schema-entities-idx.sql", "sql/system-data.sql"), Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/schema-entities-idx.sql", "sql/system-data.sql"),
"sql/drop-all-tables.sql", "sql/drop-all-tables.sql",
"sql-test.properties"); "sql-test.properties");
@BeforeClass
public static void cleanupInMemStorage(){
InMemoryStorage.getInstance().cleanup();
}
} }

View File

@ -16,10 +16,12 @@
package org.thingsboard.server.mqtt; package org.thingsboard.server.mqtt;
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.extensions.cpsuite.ClasspathSuite; import org.junit.extensions.cpsuite.ClasspathSuite;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.thingsboard.server.dao.CustomCassandraCQLUnit; import org.thingsboard.server.dao.CustomCassandraCQLUnit;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import java.util.Arrays; import java.util.Arrays;
@ -36,4 +38,9 @@ public class MqttNoSqlTestSuite {
new ClassPathCQLDataSet("cassandra/schema-entities.cql", false, false), new ClassPathCQLDataSet("cassandra/schema-entities.cql", false, false),
new ClassPathCQLDataSet("cassandra/system-data.cql", false, false)), new ClassPathCQLDataSet("cassandra/system-data.cql", false, false)),
"cassandra-test.yaml", 30000l); "cassandra-test.yaml", 30000l);
@BeforeClass
public static void cleanupInMemStorage(){
InMemoryStorage.getInstance().cleanup();
}
} }

View File

@ -15,10 +15,12 @@
*/ */
package org.thingsboard.server.mqtt; package org.thingsboard.server.mqtt;
import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.extensions.cpsuite.ClasspathSuite; import org.junit.extensions.cpsuite.ClasspathSuite;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.thingsboard.server.dao.CustomSqlUnit; import org.thingsboard.server.dao.CustomSqlUnit;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import java.util.Arrays; import java.util.Arrays;
@ -32,4 +34,9 @@ public class MqttSqlTestSuite {
Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"), Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"),
"sql/drop-all-tables.sql", "sql/drop-all-tables.sql",
"sql-test.properties"); "sql-test.properties");
@BeforeClass
public static void cleanupInMemStorage(){
InMemoryStorage.getInstance().cleanup();
}
} }

View File

@ -16,11 +16,13 @@
package org.thingsboard.server.rules; package org.thingsboard.server.rules;
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.extensions.cpsuite.ClasspathSuite; import org.junit.extensions.cpsuite.ClasspathSuite;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.thingsboard.server.dao.CustomCassandraCQLUnit; import org.thingsboard.server.dao.CustomCassandraCQLUnit;
import org.thingsboard.server.dao.CustomSqlUnit; import org.thingsboard.server.dao.CustomSqlUnit;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import java.util.Arrays; import java.util.Arrays;
@ -40,4 +42,9 @@ public class RuleEngineNoSqlTestSuite {
new ClassPathCQLDataSet("cassandra/system-data.cql", false, false)), new ClassPathCQLDataSet("cassandra/system-data.cql", false, false)),
"cassandra-test.yaml", 30000l); "cassandra-test.yaml", 30000l);
@BeforeClass
public static void cleanupInMemStorage(){
InMemoryStorage.getInstance().cleanup();
}
} }

View File

@ -15,10 +15,12 @@
*/ */
package org.thingsboard.server.rules; package org.thingsboard.server.rules;
import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.extensions.cpsuite.ClasspathSuite; import org.junit.extensions.cpsuite.ClasspathSuite;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.thingsboard.server.dao.CustomSqlUnit; import org.thingsboard.server.dao.CustomSqlUnit;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import java.util.Arrays; import java.util.Arrays;
@ -33,4 +35,9 @@ public class RuleEngineSqlTestSuite {
Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"), Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"),
"sql/drop-all-tables.sql", "sql/drop-all-tables.sql",
"sql-test.properties"); "sql-test.properties");
@BeforeClass
public static void cleanupInMemStorage(){
InMemoryStorage.getInstance().cleanup();
}
} }

View File

@ -16,10 +16,12 @@
package org.thingsboard.server.system; package org.thingsboard.server.system;
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.extensions.cpsuite.ClasspathSuite; import org.junit.extensions.cpsuite.ClasspathSuite;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.thingsboard.server.dao.CustomCassandraCQLUnit; import org.thingsboard.server.dao.CustomCassandraCQLUnit;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import java.util.Arrays; import java.util.Arrays;
@ -38,4 +40,9 @@ public class SystemNoSqlTestSuite {
new ClassPathCQLDataSet("cassandra/schema-entities.cql", false, false), new ClassPathCQLDataSet("cassandra/schema-entities.cql", false, false),
new ClassPathCQLDataSet("cassandra/system-data.cql", false, false)), new ClassPathCQLDataSet("cassandra/system-data.cql", false, false)),
"cassandra-test.yaml", 30000l); "cassandra-test.yaml", 30000l);
@BeforeClass
public static void cleanupInMemStorage(){
InMemoryStorage.getInstance().cleanup();
}
} }

View File

@ -15,10 +15,12 @@
*/ */
package org.thingsboard.server.system; package org.thingsboard.server.system;
import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.extensions.cpsuite.ClasspathSuite; import org.junit.extensions.cpsuite.ClasspathSuite;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.thingsboard.server.dao.CustomSqlUnit; import org.thingsboard.server.dao.CustomSqlUnit;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import java.util.Arrays; import java.util.Arrays;
@ -35,4 +37,9 @@ public class SystemSqlTestSuite {
"sql/drop-all-tables.sql", "sql/drop-all-tables.sql",
"sql-test.properties"); "sql-test.properties");
@BeforeClass
public static void cleanupInMemStorage(){
InMemoryStorage.getInstance().cleanup();
}
} }

View File

@ -30,11 +30,9 @@ import java.util.concurrent.TimeUnit;
public final class InMemoryStorage { public final class InMemoryStorage {
private static InMemoryStorage instance; private static InMemoryStorage instance;
private final ConcurrentHashMap<String, BlockingQueue<TbQueueMsg>> storage; private final ConcurrentHashMap<String, BlockingQueue<TbQueueMsg>> storage;
private volatile boolean stopped;
private InMemoryStorage() { private InMemoryStorage() {
storage = new ConcurrentHashMap<>(); storage = new ConcurrentHashMap<>();
stopped = false;
} }
public static InMemoryStorage getInstance() { public static InMemoryStorage getInstance() {
@ -52,9 +50,8 @@ public final class InMemoryStorage {
return storage.computeIfAbsent(topic, (t) -> new LinkedBlockingQueue<>()).add(msg); return storage.computeIfAbsent(topic, (t) -> new LinkedBlockingQueue<>()).add(msg);
} }
public <T extends TbQueueMsg> List<T> get(String topic, long durationInMillis) { public <T extends TbQueueMsg> List<T> get(String topic, long durationInMillis) throws InterruptedException {
if (storage.containsKey(topic)) { if (storage.containsKey(topic)) {
try {
List<T> entities; List<T> entities;
T first = (T) storage.get(topic).poll(durationInMillis, TimeUnit.MILLISECONDS); T first = (T) storage.get(topic).poll(durationInMillis, TimeUnit.MILLISECONDS);
if (first != null) { if (first != null) {
@ -69,16 +66,15 @@ public final class InMemoryStorage {
entities = Collections.emptyList(); entities = Collections.emptyList();
} }
return entities; return entities;
} catch (InterruptedException e) {
if (!stopped) {
log.warn("Queue was interrupted", e);
}
}
} }
return Collections.emptyList(); return Collections.emptyList();
} }
public void stop() { /**
stopped = true; * Used primarily for testing.
*/
public void cleanup() {
storage.clear();
} }
} }

View File

@ -66,7 +66,16 @@ public class InMemoryTbQueueConsumer<T extends TbQueueMsg> implements TbQueueCon
if (subscribed) { if (subscribed) {
List<T> messages = partitions List<T> messages = partitions
.stream() .stream()
.map(tpi -> storage.get(tpi.getFullTopicName(), durationInMillis)) .map(tpi -> {
try {
return storage.get(tpi.getFullTopicName(), durationInMillis);
} catch (InterruptedException e) {
if (!stopped) {
log.error("Queue was interrupted.", e);
}
return Collections.emptyList();
}
})
.flatMap(List::stream) .flatMap(List::stream)
.map(msg -> (T) msg).collect(Collectors.toList()); .map(msg -> (T) msg).collect(Collectors.toList());
if (messages.size() > 0) { if (messages.size() > 0) {

View File

@ -54,7 +54,7 @@
<dependency> <dependency>
<groupId>org.apache.cassandra</groupId> <groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId> <artifactId>cassandra-all</artifactId>
<version>3.11.4</version> <version>3.11.6</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.datastax.cassandra</groupId> <groupId>com.datastax.cassandra</groupId>