added countDownLatch for saving time

This commit is contained in:
Bohdan Smetaniuk 2020-09-24 19:03:02 +03:00
parent afacaf2547
commit 8eda901fee
3 changed files with 85 additions and 63 deletions

View File

@ -45,12 +45,11 @@ import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.controller.AbstractControllerTest;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.edge.imitator.EdgeImitator;
import org.thingsboard.server.gen.edge.EdgeConfiguration;
import org.thingsboard.server.gen.edge.*;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@ -91,7 +90,6 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
edgeImitator = new EdgeImitator("localhost", 7070, edge.getRoutingKey(), edge.getSecret());
edgeImitator.connect();
Thread.sleep(5000);
}
@After
@ -108,7 +106,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
@Test
public void test() throws Exception {
testReceivedData();
testReceivedInitialData();
testDevices();
testAssets();
testRuleChains();
@ -117,8 +115,10 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
testAlarms();
}
private void testReceivedData() throws Exception {
private void testReceivedInitialData() throws Exception {
log.info("Checking received data");
waitForMessages(6); // should be 3, but 3 events from sync service + 3 from controller. will be fixed in next releases
EdgeConfiguration configuration = edgeImitator.getStorage().getConfiguration();
Assert.assertNotNull(configuration);
@ -163,7 +163,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
TimePageData<Device> pageDataDevices = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/devices?",
new TypeReference<TimePageData<Device>>() {}, new TextPageLink(100));
Assert.assertTrue(pageDataDevices.getData().contains(savedDevice));
Thread.sleep(1000);
waitForMessages(1);
Set<UUID> devices = edgeImitator.getStorage().getEntitiesByType(EntityType.DEVICE);
Assert.assertEquals(2, devices.size());
Assert.assertTrue(devices.contains(savedDevice.getUuidId()));
@ -173,7 +173,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
pageDataDevices = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/devices?",
new TypeReference<TimePageData<Device>>() {}, new TextPageLink(100));
Assert.assertFalse(pageDataDevices.getData().contains(savedDevice));
Thread.sleep(1000);
waitForMessages(1);
devices = edgeImitator.getStorage().getEntitiesByType(EntityType.DEVICE);
Assert.assertEquals(1, devices.size());
Assert.assertFalse(devices.contains(savedDevice.getUuidId()));
@ -195,7 +195,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
TimePageData<Asset> pageDataAssets = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/assets?",
new TypeReference<TimePageData<Asset>>() {}, new TextPageLink(100));
Assert.assertTrue(pageDataAssets.getData().contains(savedAsset));
Thread.sleep(1000);
waitForMessages(1);
Set<UUID> assets = edgeImitator.getStorage().getEntitiesByType(EntityType.ASSET);
Assert.assertEquals(2, assets.size());
Assert.assertTrue(assets.contains(savedAsset.getUuidId()));
@ -205,7 +205,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
pageDataAssets = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/assets?",
new TypeReference<TimePageData<Asset>>() {}, new TextPageLink(100));
Assert.assertFalse(pageDataAssets.getData().contains(savedAsset));
Thread.sleep(1000);
waitForMessages(1);
assets = edgeImitator.getStorage().getEntitiesByType(EntityType.ASSET);
Assert.assertEquals(1, assets.size());
Assert.assertFalse(assets.contains(savedAsset.getUuidId()));
@ -227,7 +227,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
TimePageData<RuleChain> pageDataRuleChain = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/ruleChains?",
new TypeReference<TimePageData<RuleChain>>() {}, new TextPageLink(100));
Assert.assertTrue(pageDataRuleChain.getData().contains(savedRuleChain));
Thread.sleep(1000);
waitForMessages(1);
Set<UUID> ruleChains = edgeImitator.getStorage().getEntitiesByType(EntityType.RULE_CHAIN);
Assert.assertEquals(2, ruleChains.size());
Assert.assertTrue(ruleChains.contains(savedRuleChain.getUuidId()));
@ -237,7 +237,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
pageDataRuleChain = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/ruleChains?",
new TypeReference<TimePageData<RuleChain>>() {}, new TextPageLink(100));
Assert.assertFalse(pageDataRuleChain.getData().contains(savedRuleChain));
Thread.sleep(1000);
waitForMessages(1);
ruleChains = edgeImitator.getStorage().getEntitiesByType(EntityType.RULE_CHAIN);
Assert.assertEquals(1, ruleChains.size());
Assert.assertFalse(ruleChains.contains(savedRuleChain.getUuidId()));
@ -259,7 +259,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
TimePageData<DashboardInfo> pageDataDashboard = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/dashboards?",
new TypeReference<TimePageData<DashboardInfo>>() {}, new TextPageLink(100));
Assert.assertTrue(pageDataDashboard.getData().stream().allMatch(dashboardInfo -> dashboardInfo.getUuidId().equals(savedDashboard.getUuidId())));
Thread.sleep(1000);
waitForMessages(1);
Set<UUID> dashboards = edgeImitator.getStorage().getEntitiesByType(EntityType.DASHBOARD);
Assert.assertEquals(1, dashboards.size());
Assert.assertTrue(dashboards.contains(savedDashboard.getUuidId()));
@ -269,7 +269,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
pageDataDashboard = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/dashboards?",
new TypeReference<TimePageData<DashboardInfo>>() {}, new TextPageLink(100));
Assert.assertFalse(pageDataDashboard.getData().stream().anyMatch(dashboardInfo -> dashboardInfo.getUuidId().equals(savedDashboard.getUuidId())));
Thread.sleep(1000);
waitForMessages(1);
dashboards = edgeImitator.getStorage().getEntitiesByType(EntityType.DASHBOARD);
Assert.assertEquals(0, dashboards.size());
Assert.assertFalse(dashboards.contains(savedDashboard.getUuidId()));
@ -280,44 +280,6 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
}
private void installation() throws Exception {
edge = doPost("/api/edge", constructEdge("Test Edge", "test"), Edge.class);
Device device = new Device();
device.setName("Edge Device 1");
device.setType("test");
Device savedDevice = doPost("/api/device", device, Device.class);
doPost("/api/edge/" + edge.getId().getId().toString()
+ "/device/" + savedDevice.getId().getId().toString(), Device.class);
Asset asset = new Asset();
asset.setName("Edge Asset 1");
asset.setType("test");
Asset savedAsset = doPost("/api/asset", asset, Asset.class);
doPost("/api/edge/" + edge.getId().getId().toString()
+ "/asset/" + savedAsset.getId().getId().toString(), Asset.class);
}
private void uninstallation() throws Exception {
TimePageData<Device> pageDataDevices = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/devices?",
new TypeReference<TimePageData<Device>>() {}, new TextPageLink(100));
for (Device device: pageDataDevices.getData()) {
doDelete("/api/device/" + device.getId().getId().toString())
.andExpect(status().isOk());
}
TimePageData<Asset> pageDataAssets = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/assets?",
new TypeReference<TimePageData<Asset>>() {}, new TextPageLink(100));
for (Asset asset: pageDataAssets.getData()) {
doDelete("/api/asset/" + asset.getId().getId().toString())
.andExpect(status().isOk());
}
doDelete("/api/edge/" + edge.getId().getId().toString())
.andExpect(status().isOk());
}
private void testRelations() throws Exception {
log.info("Testing Relations");
List<Device> edgeDevices = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/devices?",
@ -339,7 +301,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
relation.setTypeGroup(RelationTypeGroup.COMMON);
doPost("/api/relation", relation);
Thread.sleep(1000);
waitForMessages(1);
List<EntityRelation> relations = edgeImitator.getStorage().getRelations();
Assert.assertEquals(1, relations.size());
Assert.assertTrue(relations.contains(relation));
@ -352,7 +314,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
"&toType=" + relation.getTo().getEntityType().name())
.andExpect(status().isOk());
Thread.sleep(1000);
waitForMessages(1);
relations = edgeImitator.getStorage().getRelations();
Assert.assertEquals(0, relations.size());
Assert.assertFalse(relations.contains(relation));
@ -376,20 +338,20 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
Alarm savedAlarm = doPost("/api/alarm", alarm, Alarm.class);
AlarmInfo alarmInfo = doGet("/api/alarm/info/" + savedAlarm.getId().getId().toString(), AlarmInfo.class);
Thread.sleep(1000);
waitForMessages(1);
Assert.assertEquals(1, edgeImitator.getStorage().getAlarms().size());
Assert.assertTrue(edgeImitator.getStorage().getAlarms().containsKey(alarmInfo.getType()));
Assert.assertEquals(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()), alarmInfo.getStatus());
doPost("/api/alarm/" + savedAlarm.getId().getId().toString() + "/ack");
Thread.sleep(1000);
waitForMessages(1);
alarmInfo = doGet("/api/alarm/info/" + savedAlarm.getId().getId().toString(), AlarmInfo.class);
Assert.assertTrue(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()).isAck());
Assert.assertEquals(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()), alarmInfo.getStatus());
doPost("/api/alarm/" + savedAlarm.getId().getId().toString() + "/clear");
Thread.sleep(1000);
waitForMessages(1);
alarmInfo = doGet("/api/alarm/info/" + savedAlarm.getId().getId().toString(), AlarmInfo.class);
Assert.assertTrue(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()).isAck());
Assert.assertTrue(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()).isCleared());
@ -400,4 +362,49 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
log.info("Alarms tested successfully");
}
private void installation() throws Exception {
edge = doPost("/api/edge", constructEdge("Test Edge", "test"), Edge.class);
Device device = new Device();
device.setName("Edge Device 1");
device.setType("test");
Device savedDevice = doPost("/api/device", device, Device.class);
doPost("/api/edge/" + edge.getId().getId().toString()
+ "/device/" + savedDevice.getId().getId().toString(), Device.class);
Asset asset = new Asset();
asset.setName("Edge Asset 1");
asset.setType("test");
Asset savedAsset = doPost("/api/asset", asset, Asset.class);
doPost("/api/edge/" + edge.getId().getId().toString()
+ "/asset/" + savedAsset.getId().getId().toString(), Asset.class);
}
private void uninstallation() throws Exception {
TimePageData<Device> pageDataDevices = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/devices?",
new TypeReference<TimePageData<Device>>() {}, new TextPageLink(100));
for (Device device: pageDataDevices.getData()) {
doDelete("/api/device/" + device.getId().getId().toString())
.andExpect(status().isOk());
}
TimePageData<Asset> pageDataAssets = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/assets?",
new TypeReference<TimePageData<Asset>>() {}, new TextPageLink(100));
for (Asset asset: pageDataAssets.getData()) {
doDelete("/api/asset/" + asset.getId().getId().toString())
.andExpect(status().isOk());
}
doDelete("/api/edge/" + edge.getId().getId().toString())
.andExpect(status().isOk());
}
private void waitForMessages(int messageAmount) throws InterruptedException {
edgeImitator.getStorage().setLatch(new CountDownLatch(messageAmount));
while (!edgeImitator.getStorage().getLatch().await(1, TimeUnit.SECONDS)) {
log.warn("Waiting for messages..");
}
}
}

View File

@ -111,6 +111,7 @@ public class EdgeImitator {
}
private ListenableFuture<List<Void>> processDownlinkMsg(DownlinkMsg downlinkMsg) {
log.info(String.valueOf(downlinkMsg));
List<ListenableFuture<Void>> result = new ArrayList<>();
if (downlinkMsg.getDeviceUpdateMsgList() != null && !downlinkMsg.getDeviceUpdateMsgList().isEmpty()) {
for (DeviceUpdateMsg deviceUpdateMsg: downlinkMsg.getDeviceUpdateMsgList()) {

View File

@ -36,6 +36,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
@Slf4j
@ -45,11 +46,14 @@ public class EdgeStorage {
private EdgeConfiguration configuration;
private CountDownLatch latch;
private Map<UUID, EntityType> entities;
private Map<String, AlarmStatus> alarms;
private List<EntityRelation> relations;
public EdgeStorage() {
latch = new CountDownLatch(0);
entities = new HashMap<>();
alarms = new HashMap<>();
relations = new ArrayList<>();
@ -60,15 +64,19 @@ public class EdgeStorage {
case ENTITY_CREATED_RPC_MESSAGE:
case ENTITY_UPDATED_RPC_MESSAGE:
entities.put(uuid, type);
latch.countDown();
break;
case ENTITY_DELETED_RPC_MESSAGE:
entities.remove(uuid);
if (entities.remove(uuid) != null) {
latch.countDown();
}
break;
}
return Futures.immediateFuture(null);
}
public ListenableFuture<Void> processRelation(RelationUpdateMsg relationMsg) {
boolean result = false;
EntityRelation relation = new EntityRelation();
relation.setType(relationMsg.getType());
relation.setTypeGroup(RelationTypeGroup.valueOf(relationMsg.getTypeGroup()));
@ -77,12 +85,15 @@ public class EdgeStorage {
switch (relationMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE:
case ENTITY_UPDATED_RPC_MESSAGE:
relations.add(relation);
result = relations.add(relation);
break;
case ENTITY_DELETED_RPC_MESSAGE:
relations.remove(relation);
result = relations.remove(relation);
break;
}
if (result) {
latch.countDown();
}
return Futures.immediateFuture(null);
}
@ -93,9 +104,12 @@ public class EdgeStorage {
case ALARM_ACK_RPC_MESSAGE:
case ALARM_CLEAR_RPC_MESSAGE:
alarms.put(alarmMsg.getType(), AlarmStatus.valueOf(alarmMsg.getStatus()));
latch.countDown();
break;
case ENTITY_DELETED_RPC_MESSAGE:
alarms.remove(alarmMsg.getName());
if (alarms.remove(alarmMsg.getName()) != null) {
latch.countDown();
}
break;
}
return Futures.immediateFuture(null);