concurrency fixes
This commit is contained in:
parent
199c175396
commit
fd32ba1761
@ -67,9 +67,6 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||
private EdgeImitator edgeImitator;
|
||||
private Edge edge;
|
||||
|
||||
@Autowired
|
||||
RuleChainService ruleChainService;
|
||||
|
||||
@Before
|
||||
public void beforeTest() throws Exception {
|
||||
loginSysAdmin();
|
||||
@ -88,10 +85,11 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||
tenantAdmin.setLastName("Downs");
|
||||
|
||||
tenantAdmin = createUserAndLogin(tenantAdmin, "testPassword1");
|
||||
|
||||
installation();
|
||||
|
||||
edgeImitator = new EdgeImitator("localhost", 7070, edge.getRoutingKey(), edge.getSecret());
|
||||
// should be 3, but 3 events from sync service + 3 from controller. will be fixed in next releases
|
||||
edgeImitator.getStorage().expectMessageAmount(6);
|
||||
edgeImitator.connect();
|
||||
}
|
||||
|
||||
@ -120,7 +118,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||
|
||||
private void testReceivedInitialData() throws Exception {
|
||||
log.info("Checking received data");
|
||||
waitForMessages(6); // should be 3, but 3 events from sync service + 3 from controller. will be fixed in next releases
|
||||
edgeImitator.getStorage().waitForMessages();
|
||||
|
||||
EdgeConfiguration configuration = edgeImitator.getStorage().getConfiguration();
|
||||
Assert.assertNotNull(configuration);
|
||||
@ -160,23 +158,25 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||
device.setName("Edge Device 2");
|
||||
device.setType("test");
|
||||
Device savedDevice = doPost("/api/device", device, Device.class);
|
||||
edgeImitator.getStorage().expectMessageAmount(1);
|
||||
doPost("/api/edge/" + edge.getId().getId().toString()
|
||||
+ "/device/" + savedDevice.getId().getId().toString(), Device.class);
|
||||
|
||||
TimePageData<Device> pageDataDevices = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/devices?",
|
||||
new TypeReference<TimePageData<Device>>() {}, new TextPageLink(100));
|
||||
Assert.assertTrue(pageDataDevices.getData().contains(savedDevice));
|
||||
waitForMessages(1);
|
||||
edgeImitator.getStorage().waitForMessages();
|
||||
Set<UUID> devices = edgeImitator.getStorage().getEntitiesByType(EntityType.DEVICE);
|
||||
Assert.assertEquals(2, devices.size());
|
||||
Assert.assertTrue(devices.contains(savedDevice.getUuidId()));
|
||||
|
||||
edgeImitator.getStorage().expectMessageAmount(1);
|
||||
doDelete("/api/edge/" + edge.getId().getId().toString()
|
||||
+ "/device/" + savedDevice.getId().getId().toString(), Device.class);
|
||||
pageDataDevices = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/devices?",
|
||||
new TypeReference<TimePageData<Device>>() {}, new TextPageLink(100));
|
||||
Assert.assertFalse(pageDataDevices.getData().contains(savedDevice));
|
||||
waitForMessages(1);
|
||||
edgeImitator.getStorage().waitForMessages();
|
||||
devices = edgeImitator.getStorage().getEntitiesByType(EntityType.DEVICE);
|
||||
Assert.assertEquals(1, devices.size());
|
||||
Assert.assertFalse(devices.contains(savedDevice.getUuidId()));
|
||||
@ -192,23 +192,25 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||
asset.setName("Edge Asset 2");
|
||||
asset.setType("test");
|
||||
Asset savedAsset = doPost("/api/asset", asset, Asset.class);
|
||||
edgeImitator.getStorage().expectMessageAmount(1);
|
||||
doPost("/api/edge/" + edge.getId().getId().toString()
|
||||
+ "/asset/" + savedAsset.getId().getId().toString(), Asset.class);
|
||||
|
||||
TimePageData<Asset> pageDataAssets = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/assets?",
|
||||
new TypeReference<TimePageData<Asset>>() {}, new TextPageLink(100));
|
||||
Assert.assertTrue(pageDataAssets.getData().contains(savedAsset));
|
||||
waitForMessages(1);
|
||||
edgeImitator.getStorage().waitForMessages();
|
||||
Set<UUID> assets = edgeImitator.getStorage().getEntitiesByType(EntityType.ASSET);
|
||||
Assert.assertEquals(2, assets.size());
|
||||
Assert.assertTrue(assets.contains(savedAsset.getUuidId()));
|
||||
|
||||
edgeImitator.getStorage().expectMessageAmount(1);
|
||||
doDelete("/api/edge/" + edge.getId().getId().toString()
|
||||
+ "/asset/" + savedAsset.getId().getId().toString(), Asset.class);
|
||||
pageDataAssets = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/assets?",
|
||||
new TypeReference<TimePageData<Asset>>() {}, new TextPageLink(100));
|
||||
Assert.assertFalse(pageDataAssets.getData().contains(savedAsset));
|
||||
waitForMessages(1);
|
||||
edgeImitator.getStorage().waitForMessages();
|
||||
assets = edgeImitator.getStorage().getEntitiesByType(EntityType.ASSET);
|
||||
Assert.assertEquals(1, assets.size());
|
||||
Assert.assertFalse(assets.contains(savedAsset.getUuidId()));
|
||||
@ -224,23 +226,25 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||
ruleChain.setName("Edge Test Rule Chain");
|
||||
ruleChain.setType(RuleChainType.EDGE);
|
||||
RuleChain savedRuleChain = doPost("/api/ruleChain", ruleChain, RuleChain.class);
|
||||
edgeImitator.getStorage().expectMessageAmount(1);
|
||||
doPost("/api/edge/" + edge.getId().getId().toString()
|
||||
+ "/ruleChain/" + savedRuleChain.getId().getId().toString(), RuleChain.class);
|
||||
|
||||
TimePageData<RuleChain> pageDataRuleChain = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/ruleChains?",
|
||||
new TypeReference<TimePageData<RuleChain>>() {}, new TextPageLink(100));
|
||||
Assert.assertTrue(pageDataRuleChain.getData().contains(savedRuleChain));
|
||||
waitForMessages(1);
|
||||
edgeImitator.getStorage().waitForMessages();
|
||||
Set<UUID> ruleChains = edgeImitator.getStorage().getEntitiesByType(EntityType.RULE_CHAIN);
|
||||
Assert.assertEquals(2, ruleChains.size());
|
||||
Assert.assertTrue(ruleChains.contains(savedRuleChain.getUuidId()));
|
||||
|
||||
edgeImitator.getStorage().expectMessageAmount(1);
|
||||
doDelete("/api/edge/" + edge.getId().getId().toString()
|
||||
+ "/ruleChain/" + savedRuleChain.getId().getId().toString(), RuleChain.class);
|
||||
pageDataRuleChain = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/ruleChains?",
|
||||
new TypeReference<TimePageData<RuleChain>>() {}, new TextPageLink(100));
|
||||
Assert.assertFalse(pageDataRuleChain.getData().contains(savedRuleChain));
|
||||
waitForMessages(1);
|
||||
edgeImitator.getStorage().waitForMessages();
|
||||
ruleChains = edgeImitator.getStorage().getEntitiesByType(EntityType.RULE_CHAIN);
|
||||
Assert.assertEquals(1, ruleChains.size());
|
||||
Assert.assertFalse(ruleChains.contains(savedRuleChain.getUuidId()));
|
||||
@ -256,23 +260,25 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||
Dashboard dashboard = new Dashboard();
|
||||
dashboard.setTitle("Edge Test Dashboard");
|
||||
Dashboard savedDashboard = doPost("/api/dashboard", dashboard, Dashboard.class);
|
||||
edgeImitator.getStorage().expectMessageAmount(1);
|
||||
doPost("/api/edge/" + edge.getId().getId().toString()
|
||||
+ "/dashboard/" + savedDashboard.getId().getId().toString(), Dashboard.class);
|
||||
|
||||
TimePageData<DashboardInfo> pageDataDashboard = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/dashboards?",
|
||||
new TypeReference<TimePageData<DashboardInfo>>() {}, new TextPageLink(100));
|
||||
Assert.assertTrue(pageDataDashboard.getData().stream().allMatch(dashboardInfo -> dashboardInfo.getUuidId().equals(savedDashboard.getUuidId())));
|
||||
waitForMessages(1);
|
||||
edgeImitator.getStorage().waitForMessages();
|
||||
Set<UUID> dashboards = edgeImitator.getStorage().getEntitiesByType(EntityType.DASHBOARD);
|
||||
Assert.assertEquals(1, dashboards.size());
|
||||
Assert.assertTrue(dashboards.contains(savedDashboard.getUuidId()));
|
||||
|
||||
edgeImitator.getStorage().expectMessageAmount(1);
|
||||
doDelete("/api/edge/" + edge.getId().getId().toString()
|
||||
+ "/dashboard/" + savedDashboard.getId().getId().toString(), Dashboard.class);
|
||||
pageDataDashboard = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/dashboards?",
|
||||
new TypeReference<TimePageData<DashboardInfo>>() {}, new TextPageLink(100));
|
||||
Assert.assertFalse(pageDataDashboard.getData().stream().anyMatch(dashboardInfo -> dashboardInfo.getUuidId().equals(savedDashboard.getUuidId())));
|
||||
waitForMessages(1);
|
||||
edgeImitator.getStorage().waitForMessages();
|
||||
dashboards = edgeImitator.getStorage().getEntitiesByType(EntityType.DASHBOARD);
|
||||
Assert.assertEquals(0, dashboards.size());
|
||||
Assert.assertFalse(dashboards.contains(savedDashboard.getUuidId()));
|
||||
@ -302,12 +308,14 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||
relation.setFrom(device.getId());
|
||||
relation.setTo(asset.getId());
|
||||
relation.setTypeGroup(RelationTypeGroup.COMMON);
|
||||
edgeImitator.getStorage().expectMessageAmount(1);
|
||||
doPost("/api/relation", relation);
|
||||
|
||||
waitForMessages(1);
|
||||
edgeImitator.getStorage().waitForMessages();
|
||||
List<EntityRelation> relations = edgeImitator.getStorage().getRelations();
|
||||
Assert.assertEquals(1, relations.size());
|
||||
Assert.assertTrue(relations.contains(relation));
|
||||
edgeImitator.getStorage().expectMessageAmount(1);
|
||||
doDelete("/api/relation?" +
|
||||
"fromId=" + relation.getFrom().getId().toString() +
|
||||
"&fromType=" + relation.getFrom().getEntityType().name() +
|
||||
@ -317,7 +325,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||
"&toType=" + relation.getTo().getEntityType().name())
|
||||
.andExpect(status().isOk());
|
||||
|
||||
waitForMessages(1);
|
||||
edgeImitator.getStorage().waitForMessages();
|
||||
relations = edgeImitator.getStorage().getRelations();
|
||||
Assert.assertEquals(0, relations.size());
|
||||
Assert.assertFalse(relations.contains(relation));
|
||||
@ -339,22 +347,25 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||
alarm.setType("alarm");
|
||||
alarm.setSeverity(AlarmSeverity.CRITICAL);
|
||||
|
||||
edgeImitator.getStorage().expectMessageAmount(1);
|
||||
Alarm savedAlarm = doPost("/api/alarm", alarm, Alarm.class);
|
||||
AlarmInfo alarmInfo = doGet("/api/alarm/info/" + savedAlarm.getId().getId().toString(), AlarmInfo.class);
|
||||
waitForMessages(1);
|
||||
edgeImitator.getStorage().waitForMessages();
|
||||
|
||||
Assert.assertEquals(1, edgeImitator.getStorage().getAlarms().size());
|
||||
Assert.assertTrue(edgeImitator.getStorage().getAlarms().containsKey(alarmInfo.getType()));
|
||||
Assert.assertEquals(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()), alarmInfo.getStatus());
|
||||
edgeImitator.getStorage().expectMessageAmount(1);
|
||||
doPost("/api/alarm/" + savedAlarm.getId().getId().toString() + "/ack");
|
||||
|
||||
waitForMessages(1);
|
||||
edgeImitator.getStorage().waitForMessages();
|
||||
alarmInfo = doGet("/api/alarm/info/" + savedAlarm.getId().getId().toString(), AlarmInfo.class);
|
||||
Assert.assertTrue(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()).isAck());
|
||||
Assert.assertEquals(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()), alarmInfo.getStatus());
|
||||
edgeImitator.getStorage().expectMessageAmount(1);
|
||||
doPost("/api/alarm/" + savedAlarm.getId().getId().toString() + "/clear");
|
||||
|
||||
waitForMessages(1);
|
||||
edgeImitator.getStorage().waitForMessages();
|
||||
alarmInfo = doGet("/api/alarm/info/" + savedAlarm.getId().getId().toString(), AlarmInfo.class);
|
||||
Assert.assertTrue(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()).isAck());
|
||||
Assert.assertTrue(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()).isCleared());
|
||||
@ -402,12 +413,4 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||
doDelete("/api/edge/" + edge.getId().getId().toString())
|
||||
.andExpect(status().isOk());
|
||||
}
|
||||
|
||||
private void waitForMessages(int messageAmount) throws InterruptedException {
|
||||
edgeImitator.getStorage().setLatch(new CountDownLatch(messageAmount));
|
||||
while (!edgeImitator.getStorage().getLatch().await(1, TimeUnit.SECONDS)) {
|
||||
log.warn("Waiting for messages..");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -37,6 +37,7 @@ import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@ -121,4 +122,12 @@ public class EdgeStorage {
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)).keySet();
|
||||
}
|
||||
|
||||
public void waitForMessages() throws InterruptedException {
|
||||
latch.await(5, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public void expectMessageAmount(int messageAmount) {
|
||||
latch = new CountDownLatch(messageAmount);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user