edge services code coverage

This commit is contained in:
Bohdan Smetaniuk 2020-09-22 12:41:49 +03:00
parent a1db669a9d
commit a6ae072bc9
7 changed files with 757 additions and 2 deletions

View File

@ -588,7 +588,7 @@ transport:
# Edges parameters
edges:
rpc:
enabled: "${EDGES_RPC_ENABLED:false}"
enabled: "${EDGES_RPC_ENABLED:true}"
port: "${EDGES_RPC_PORT:7070}"
ssl:
# Enable/disable SSL support

View File

@ -94,6 +94,11 @@
<groupId>org.thingsboard</groupId>
<artifactId>rest-client</artifactId>
</dependency>
<dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>edge-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -31,7 +31,7 @@ import java.util.List;
import java.util.Map;
@RunWith(ClasspathSuite.class)
@ClasspathSuite.ClassnameFilters({"org.thingsboard.server.msa.*Test"})
@ClasspathSuite.ClassnameFilters({"org.thingsboard.server.msa.*EdgeTest"})
public class ContainerTestSuite {
private static DockerComposeContainer testContainer;

View File

@ -0,0 +1,156 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.msa.edge;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.thingsboard.edge.rpc.EdgeGrpcClient;
import org.thingsboard.edge.rpc.EdgeRpcClient;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.gen.edge.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.AssetUpdateMsg;
import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.DownlinkResponseMsg;
import org.thingsboard.server.gen.edge.EdgeConfiguration;
import org.thingsboard.server.gen.edge.EntityDataProto;
import org.thingsboard.server.gen.edge.RelationUpdateMsg;
import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
import org.thingsboard.server.gen.edge.UplinkResponseMsg;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@Slf4j
public class EdgeImitator {
private String routingKey;
private String routingSecret;
private EdgeRpcClient edgeRpcClient;
@Getter
private EdgeStorage storage;
public EdgeImitator(String host, int port, String routingKey, String routingSecret) throws NoSuchFieldException, IllegalAccessException {
edgeRpcClient = new EdgeGrpcClient();
storage = new EdgeStorage();
this.routingKey = routingKey;
this.routingSecret = routingSecret;
setEdgeCredentials("rpcHost", host);
setEdgeCredentials("rpcPort", port);
}
private void setEdgeCredentials(String fieldName, Object value) throws NoSuchFieldException, IllegalAccessException {
Field fieldToSet = edgeRpcClient.getClass().getDeclaredField(fieldName);
fieldToSet.setAccessible(true);
fieldToSet.set(edgeRpcClient, value);
fieldToSet.setAccessible(false);
}
public void connect() {
edgeRpcClient.connect(routingKey, routingSecret,
this::onUplinkResponse,
this::onEdgeUpdate,
this::onDownlink,
this::onError);
}
public void disconnect() throws InterruptedException {
edgeRpcClient.disconnect();
}
private void onUplinkResponse(UplinkResponseMsg msg) {
log.info("onUplinkResponse: {}", msg);
}
private void onEdgeUpdate(EdgeConfiguration edgeConfiguration) {
storage.setConfiguration(edgeConfiguration);
}
private void onDownlink(DownlinkMsg downlinkMsg) {
ListenableFuture<List<Void>> future = processDownlinkMsg(downlinkMsg);
Futures.addCallback(future, new FutureCallback<List<Void>>() {
@Override
public void onSuccess(@Nullable List<Void> result) {
DownlinkResponseMsg downlinkResponseMsg = DownlinkResponseMsg.newBuilder().setSuccess(true).build();
edgeRpcClient.sendDownlinkResponseMsg(downlinkResponseMsg);
}
@Override
public void onFailure(Throwable t) {
DownlinkResponseMsg downlinkResponseMsg = DownlinkResponseMsg.newBuilder().setSuccess(false).setErrorMsg(t.getMessage()).build();
edgeRpcClient.sendDownlinkResponseMsg(downlinkResponseMsg);
}
}, MoreExecutors.directExecutor());
}
private void onError(Exception e) {
log.error("Error during Edge lifecycle: ", e);
}
private ListenableFuture<List<Void>> processDownlinkMsg(DownlinkMsg downlinkMsg) {
List<ListenableFuture<Void>> result = new ArrayList<>();
if (downlinkMsg.getDeviceUpdateMsgList() != null && !downlinkMsg.getDeviceUpdateMsgList().isEmpty()) {
for (DeviceUpdateMsg deviceUpdateMsg: downlinkMsg.getDeviceUpdateMsgList()) {
result.add(storage.processEntity(deviceUpdateMsg.getMsgType(), EdgeEventType.DEVICE, new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())));
}
}
if (downlinkMsg.getAssetUpdateMsgList() != null && !downlinkMsg.getAssetUpdateMsgList().isEmpty()) {
for (AssetUpdateMsg assetUpdateMsg: downlinkMsg.getAssetUpdateMsgList()) {
result.add(storage.processEntity(assetUpdateMsg.getMsgType(), EdgeEventType.ASSET, new UUID(assetUpdateMsg.getIdMSB(), assetUpdateMsg.getIdLSB())));
}
}
if (downlinkMsg.getRuleChainUpdateMsgList() != null && !downlinkMsg.getRuleChainUpdateMsgList().isEmpty()) {
for (RuleChainUpdateMsg ruleChainUpdateMsg: downlinkMsg.getRuleChainUpdateMsgList()) {
result.add(storage.processEntity(ruleChainUpdateMsg.getMsgType(), EdgeEventType.RULE_CHAIN, new UUID(ruleChainUpdateMsg.getIdMSB(), ruleChainUpdateMsg.getIdLSB())));
}
}
if (downlinkMsg.getDashboardUpdateMsgList() != null && !downlinkMsg.getDashboardUpdateMsgList().isEmpty()) {
for (DashboardUpdateMsg dashboardUpdateMsg: downlinkMsg.getDashboardUpdateMsgList()) {
result.add(storage.processEntity(dashboardUpdateMsg.getMsgType(), EdgeEventType.DASHBOARD, new UUID(dashboardUpdateMsg.getIdMSB(), dashboardUpdateMsg.getIdLSB())));
}
}
if (downlinkMsg.getRelationUpdateMsgList() != null && !downlinkMsg.getRelationUpdateMsgList().isEmpty()) {
for (RelationUpdateMsg relationUpdateMsg: downlinkMsg.getRelationUpdateMsgList()) {
result.add(storage.processRelation(relationUpdateMsg));
}
}
if (downlinkMsg.getAlarmUpdateMsgList() != null && !downlinkMsg.getAlarmUpdateMsgList().isEmpty()) {
for (AlarmUpdateMsg alarmUpdateMsg: downlinkMsg.getAlarmUpdateMsgList()) {
result.add(storage.processAlarm(alarmUpdateMsg));
}
}
if (downlinkMsg.getEntityDataList() != null && !downlinkMsg.getEntityDataList().isEmpty()) {
for (EntityDataProto entityDataProto: downlinkMsg.getEntityDataList()) {
if (entityDataProto.hasPostTelemetryMsg()) {
result.add(storage.processTelemetry(new UUID(entityDataProto.getEntityIdMSB(), entityDataProto.getEntityIdLSB()), entityDataProto.getPostTelemetryMsg()));
}
}
}
return Futures.allAsList(result);
}
}

View File

@ -0,0 +1,120 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.msa.edge;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.alarm.AlarmStatus;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.gen.edge.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.EdgeConfiguration;
import org.thingsboard.server.gen.edge.RelationUpdateMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@Slf4j
@Getter
@Setter
public class EdgeStorage {
private EdgeConfiguration configuration;
private Map<UUID, EdgeEventType> entities;
private Map<String, AlarmStatus> alarms;
private List<EntityRelation> relations;
private Map<UUID, TransportProtos.PostTelemetryMsg> latestTelemetry;
public EdgeStorage() {
entities = new HashMap<>();
alarms = new HashMap<>();
relations = new ArrayList<>();
latestTelemetry = new HashMap<>();
}
public ListenableFuture<Void> processEntity(UpdateMsgType msgType, EdgeEventType type, UUID uuid) {
switch (msgType) {
case ENTITY_CREATED_RPC_MESSAGE:
case ENTITY_UPDATED_RPC_MESSAGE:
entities.put(uuid, type);
break;
case ENTITY_DELETED_RPC_MESSAGE:
entities.remove(uuid);
break;
}
return Futures.immediateFuture(null);
}
public ListenableFuture<Void> processRelation(RelationUpdateMsg relationMsg) {
EntityRelation relation = new EntityRelation();
relation.setType(relationMsg.getType());
relation.setTypeGroup(RelationTypeGroup.valueOf(relationMsg.getTypeGroup()));
relation.setTo(EntityIdFactory.getByTypeAndUuid(relationMsg.getToEntityType(), new UUID(relationMsg.getToIdMSB(), relationMsg.getToIdLSB())));
relation.setFrom(EntityIdFactory.getByTypeAndUuid(relationMsg.getFromEntityType(), new UUID(relationMsg.getFromIdMSB(), relationMsg.getFromIdLSB())));
switch (relationMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE:
case ENTITY_UPDATED_RPC_MESSAGE:
relations.add(relation);
break;
case ENTITY_DELETED_RPC_MESSAGE:
relations.remove(relation);
break;
}
return Futures.immediateFuture(null);
}
public ListenableFuture<Void> processAlarm(AlarmUpdateMsg alarmMsg) {
switch (alarmMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE:
case ENTITY_UPDATED_RPC_MESSAGE:
case ALARM_ACK_RPC_MESSAGE:
case ALARM_CLEAR_RPC_MESSAGE:
alarms.put(alarmMsg.getType(), AlarmStatus.valueOf(alarmMsg.getStatus()));
break;
case ENTITY_DELETED_RPC_MESSAGE:
alarms.remove(alarmMsg.getName());
break;
}
return Futures.immediateFuture(null);
}
public ListenableFuture<Void> processTelemetry(UUID uuid, TransportProtos.PostTelemetryMsg telemetryMsg) {
latestTelemetry.put(uuid, telemetryMsg);
return Futures.immediateFuture(null);
}
public Set<UUID> getEntitiesByType(EdgeEventType type) {
Map<UUID, EdgeEventType> filtered = entities.entrySet().stream()
.filter(entry -> entry.getValue().equals(type))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return filtered.keySet();
}
}

View File

@ -0,0 +1,341 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.msa.edge;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.junit.*;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
import org.thingsboard.server.common.data.alarm.AlarmStatus;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.gen.edge.EdgeConfiguration;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.msa.AbstractContainerTest;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@Slf4j
public class EdgeTest extends AbstractContainerTest {
private static EdgeImitator edgeImitator;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, IOException {
restClient.login("tenant@thingsboard.org", "tenant");
installation();
edgeImitator = new EdgeImitator("localhost", 7070, "routing", "secret");
edgeImitator.connect();
Thread.sleep(10000);
}
@Test
public void testReceivedData() {
Edge edge = restClient.getTenantEdge("Edge1").get();
EdgeConfiguration configuration = edgeImitator.getStorage().getConfiguration();
Assert.assertNotNull(configuration);
Map<UUID, EdgeEventType> entities = edgeImitator.getStorage().getEntities();
Assert.assertFalse(entities.isEmpty());
Set<UUID> devices = edgeImitator.getStorage().getEntitiesByType(EdgeEventType.DEVICE);
Assert.assertEquals(1, devices.size());
for (Device device: restClient.getEdgeDevices(edge.getId(), new TextPageLink(1)).getData()) {
Assert.assertTrue(devices.contains(device.getUuidId()));
}
Set<UUID> ruleChains = edgeImitator.getStorage().getEntitiesByType(EdgeEventType.RULE_CHAIN);
Assert.assertEquals(1, ruleChains.size());
for (RuleChain ruleChain: restClient.getEdgeRuleChains(edge.getId(), new TimePageLink(1)).getData()) {
Assert.assertTrue(ruleChains.contains(ruleChain.getUuidId()));
}
Set<UUID> assets = edgeImitator.getStorage().getEntitiesByType(EdgeEventType.ASSET);
Assert.assertEquals(1, assets.size());
for (Asset asset: restClient.getEdgeAssets(edge.getId(), new TextPageLink(1)).getData()) {
Assert.assertTrue(assets.contains(asset.getUuidId()));
}
}
@Test
public void testDevices() throws Exception {
Edge edge = restClient.getTenantEdge("Edge1").get();
Device device = new Device();
device.setName("Edge Device 2");
device.setType("test");
Device savedDevice = restClient.saveDevice(device);
restClient.assignDeviceToEdge(edge.getId(), savedDevice.getId());
Thread.sleep(1000);
Assert.assertTrue(restClient.getEdgeDevices(edge.getId(), new TextPageLink(2)).getData().contains(savedDevice));
Set<UUID> devices = edgeImitator.getStorage().getEntitiesByType(EdgeEventType.DEVICE);
Assert.assertEquals(2, devices.size());
Assert.assertTrue(devices.contains(savedDevice.getUuidId()));
restClient.unassignDeviceFromEdge(edge.getId(), savedDevice.getId());
Thread.sleep(1000);
Assert.assertFalse(restClient.getEdgeDevices(edge.getId(), new TextPageLink(2)).getData().contains(savedDevice));
devices = edgeImitator.getStorage().getEntitiesByType(EdgeEventType.DEVICE);
Assert.assertEquals(1, devices.size());
Assert.assertFalse(devices.contains(savedDevice.getUuidId()));
restClient.deleteDevice(savedDevice.getId());
}
@Test
public void testAssets() throws Exception {
Edge edge = restClient.getTenantEdge("Edge1").get();
Asset asset = new Asset();
asset.setName("Edge Asset 2");
asset.setType("test");
Asset savedAsset = restClient.saveAsset(asset);
restClient.assignAssetToEdge(edge.getId(), savedAsset.getId());
Thread.sleep(1000);
Assert.assertTrue(restClient.getEdgeAssets(edge.getId(), new TextPageLink(2)).getData().contains(savedAsset));
Set<UUID> assets = edgeImitator.getStorage().getEntitiesByType(EdgeEventType.ASSET);
Assert.assertEquals(2, assets.size());
Assert.assertTrue(assets.contains(savedAsset.getUuidId()));
restClient.unassignAssetFromEdge(edge.getId(), savedAsset.getId());
Thread.sleep(1000);
Assert.assertFalse(restClient.getEdgeAssets(edge.getId(), new TextPageLink(2)).getData().contains(savedAsset));
assets = edgeImitator.getStorage().getEntitiesByType(EdgeEventType.ASSET);
Assert.assertEquals(1, assets.size());
Assert.assertFalse(assets.contains(savedAsset.getUuidId()));
restClient.deleteAsset(savedAsset.getId());
}
@Test
public void testRuleChains() throws Exception {
Edge edge = restClient.getTenantEdge("Edge1").get();
RuleChain ruleChain = new RuleChain();
ruleChain.setName("Edge Test Rule Chain");
ruleChain.setType(RuleChainType.EDGE);
RuleChain savedRuleChain = restClient.saveRuleChain(ruleChain);
restClient.assignRuleChainToEdge(edge.getId(), savedRuleChain.getId());
Thread.sleep(1000);
Assert.assertTrue(restClient.getEdgeRuleChains(edge.getId(), new TimePageLink(2)).getData().contains(savedRuleChain));
Set<UUID> ruleChains = edgeImitator.getStorage().getEntitiesByType(EdgeEventType.RULE_CHAIN);
Assert.assertEquals(2, ruleChains.size());
Assert.assertTrue(ruleChains.contains(savedRuleChain.getUuidId()));
restClient.unassignRuleChainFromEdge(edge.getId(), savedRuleChain.getId());
Thread.sleep(1000);
Assert.assertFalse(restClient.getEdgeRuleChains(edge.getId(), new TimePageLink(2)).getData().contains(savedRuleChain));
ruleChains = edgeImitator.getStorage().getEntitiesByType(EdgeEventType.RULE_CHAIN);
Assert.assertEquals(1, ruleChains.size());
Assert.assertFalse(ruleChains.contains(savedRuleChain.getUuidId()));
restClient.deleteRuleChain(savedRuleChain.getId());
}
@Test
public void testDashboards() throws Exception {
Edge edge = restClient.getTenantEdge("Edge1").get();
Dashboard dashboard = new Dashboard();
dashboard.setTitle("Edge Test Dashboard");
Dashboard savedDashboard = restClient.saveDashboard(dashboard);
restClient.assignDashboardToEdge(edge.getId(), savedDashboard.getId());
Thread.sleep(1000);
Assert.assertTrue(restClient.getEdgeDashboards(edge.getId(), new TimePageLink(2)).getData().stream().allMatch(dashboardInfo -> dashboardInfo.getUuidId().equals(savedDashboard.getUuidId())));
Set<UUID> dashboards = edgeImitator.getStorage().getEntitiesByType(EdgeEventType.DASHBOARD);
Assert.assertEquals(1, dashboards.size());
Assert.assertTrue(dashboards.contains(savedDashboard.getUuidId()));
restClient.unassignDashboardFromEdge(edge.getId(), savedDashboard.getId());
Thread.sleep(1000);
Assert.assertFalse(restClient.getEdgeDashboards(edge.getId(), new TimePageLink(2)).getData().stream().anyMatch(dashboardInfo -> dashboardInfo.getUuidId().equals(savedDashboard.getUuidId())));
dashboards = edgeImitator.getStorage().getEntitiesByType(EdgeEventType.DASHBOARD);
Assert.assertEquals(0, dashboards.size());
Assert.assertFalse(dashboards.contains(savedDashboard.getUuidId()));
restClient.deleteDashboard(savedDashboard.getId());
}
@Test
public void testRelations() throws InterruptedException {
Device device = restClient.getTenantDevice("Edge Device 1").get();
Asset asset = restClient.getTenantAsset("Edge Asset 1").get();
EntityRelation relation = new EntityRelation();
relation.setType("test");
relation.setFrom(device.getId());
relation.setTo(asset.getId());
relation.setTypeGroup(RelationTypeGroup.COMMON);
restClient.saveRelation(relation);
Thread.sleep(1000);
List<EntityRelation> relations = edgeImitator.getStorage().getRelations();
Assert.assertEquals(1, relations.size());
Assert.assertTrue(relations.contains(relation));
restClient.deleteRelation(relation.getFrom(), relation.getType(), relation.getTypeGroup(), relation.getTo());
Thread.sleep(1000);
relations = edgeImitator.getStorage().getRelations();
Assert.assertEquals(0, relations.size());
Assert.assertFalse(relations.contains(relation));
}
@Test
public void testAlarms() throws Exception {
Device device = restClient.getTenantDevice("Edge Device 1").get();
Alarm alarm = new Alarm();
alarm.setOriginator(device.getId());
alarm.setStatus(AlarmStatus.ACTIVE_UNACK);
alarm.setType("alarm");
alarm.setSeverity(AlarmSeverity.CRITICAL);
Alarm savedAlarm = restClient.saveAlarm(alarm);
AlarmInfo alarmInfo = restClient.getAlarmInfoById(savedAlarm.getId()).get();
Thread.sleep(1000);
Assert.assertEquals(1, edgeImitator.getStorage().getAlarms().size());
Assert.assertTrue(edgeImitator.getStorage().getAlarms().containsKey(alarmInfo.getType()));
Assert.assertEquals(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()), alarmInfo.getStatus());
restClient.ackAlarm(savedAlarm.getId());
Thread.sleep(1000);
alarmInfo = restClient.getAlarmInfoById(savedAlarm.getId()).get();
Assert.assertTrue(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()).isAck());
Assert.assertEquals(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()), alarmInfo.getStatus());
restClient.clearAlarm(savedAlarm.getId());
Thread.sleep(1000);
alarmInfo = restClient.getAlarmInfoById(savedAlarm.getId()).get();
Assert.assertTrue(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()).isAck());
Assert.assertTrue(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()).isCleared());
Assert.assertEquals(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()), alarmInfo.getStatus());
restClient.deleteAlarm(savedAlarm.getId());
}
@Ignore
@Test
public void testTelemetry() throws Exception {
Device device = restClient.getTenantDevice("Edge Device 1").get();
DeviceCredentials deviceCredentials = restClient.getDeviceCredentialsByDeviceId(device.getId()).get();
ResponseEntity response = restClient.getRestTemplate()
.postForEntity(HTTPS_URL + "/api/v1/{credentialsId}/telemetry",
"{'test': 25}",
ResponseEntity.class,
deviceCredentials.getCredentialsId());
Assert.assertEquals(response.getStatusCode(), HttpStatus.OK);
Thread.sleep(1000);
List<String> keys = restClient.getTimeseriesKeys(device.getId());
List<TsKvEntry> latestTimeseries = restClient.getLatestTimeseries(device.getId(), keys);
Assert.assertEquals(1, latestTimeseries.size());
TsKvEntry tsKvEntry = latestTimeseries.get(0);
Map<UUID, TransportProtos.PostTelemetryMsg> telemetry = edgeImitator.getStorage().getLatestTelemetry();
Assert.assertEquals(1, telemetry.size());
Assert.assertTrue(telemetry.containsKey(device.getUuidId()));
TransportProtos.PostTelemetryMsg telemetryMsg = telemetry.get(device.getUuidId());
Assert.assertEquals(1, telemetryMsg.getTsKvListCount());
TransportProtos.TsKvListProto tsKv = telemetryMsg.getTsKvListList().get(0);
Assert.assertEquals(tsKvEntry.getTs(), tsKv.getTs());
Assert.assertEquals(1, tsKv.getKvCount());
TransportProtos.KeyValueProto keyValue = tsKv.getKvList().get(0);
Assert.assertEquals(tsKvEntry.getKey(), keyValue.getKey());
Assert.assertEquals(tsKvEntry.getValueAsString(), Long.toString(keyValue.getLongV()));
}
@AfterClass
public static void destroy() throws InterruptedException {
uninstallation();
edgeImitator.disconnect();
}
private static void installation() throws IOException {
Edge edge = new Edge();
edge.setName("Edge1");
edge.setType("test");
edge.setRoutingKey("routing");
edge.setSecret("secret");
Edge savedEdge = restClient.saveEdge(edge);
Device device = new Device();
device.setName("Edge Device 1");
device.setType("test");
Device savedDevice = restClient.saveDevice(device);
restClient.assignDeviceToEdge(savedEdge.getId(), savedDevice.getId());
Asset asset = new Asset();
asset.setName("Edge Asset 1");
asset.setType("test");
Asset savedAsset = restClient.saveAsset(asset);
restClient.assignAssetToEdge(savedEdge.getId(), savedAsset.getId());
ObjectMapper mapper = new ObjectMapper();
Class edgeTestClass = EdgeTest.class;
JsonNode configuration = mapper.readTree(edgeTestClass.getClassLoader().getResourceAsStream("RootRuleChain.json"));
RuleChain ruleChain = mapper.treeToValue(configuration.get("ruleChain"), RuleChain.class);
RuleChainMetaData ruleChainMetaData = mapper.treeToValue(configuration.get("metadata"), RuleChainMetaData.class);
RuleChain savedRuleChain = restClient.saveRuleChain(ruleChain);
ruleChainMetaData.setRuleChainId(savedRuleChain.getId());
restClient.saveRuleChainMetaData(ruleChainMetaData);
restClient.setRootRuleChain(savedRuleChain.getId());
}
private static void uninstallation() {
Device device = restClient.getTenantDevice("Edge Device 1").get();
restClient.deleteDevice(device.getId());
Asset asset = restClient.getTenantAsset("Edge Asset 1").get();
restClient.deleteAsset(asset.getId());
Edge edge = restClient.getTenantEdge("Edge1").get();
restClient.deleteEdge(edge.getId());
List<RuleChain> ruleChains = restClient.getRuleChains(new TextPageLink(3)).getData();
RuleChain oldRoot = ruleChains.stream().filter(ruleChain -> ruleChain.getName().equals("Root Rule Chain")).findAny().get();
RuleChain newRoot = ruleChains.stream().filter(ruleChain -> ruleChain.getName().equals("Test Root Rule Chain")).findAny().get();
restClient.setRootRuleChain(oldRoot.getId());
restClient.deleteRuleChain(newRoot.getId());
}
}

View File

@ -0,0 +1,133 @@
{
"ruleChain": {
"additionalInfo": null,
"name": "Test Root Rule Chain",
"type": "CORE",
"firstRuleNodeId": null,
"root": false,
"debugMode": false,
"configuration": null
},
"metadata": {
"firstNodeIndex": 4,
"nodes": [
{
"additionalInfo": {
"layoutX": 1117,
"layoutY": 156
},
"type": "org.thingsboard.rule.engine.edge.TbMsgPushToEdgeNode",
"name": "Push to edge",
"debugMode": false,
"configuration": {
"version": 0
}
},
{
"additionalInfo": {
"layoutX": 825,
"layoutY": 407
},
"type": "org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode",
"name": "RPC Call Request",
"debugMode": false,
"configuration": {
"timeoutInSeconds": 60
}
},
{
"additionalInfo": {
"layoutX": 826,
"layoutY": 327
},
"type": "org.thingsboard.rule.engine.action.TbLogNode",
"name": "Log Other",
"debugMode": false,
"configuration": {
"jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);"
}
},
{
"additionalInfo": {
"layoutX": 827,
"layoutY": 244
},
"type": "org.thingsboard.rule.engine.action.TbLogNode",
"name": "Log RPC from Device",
"debugMode": false,
"configuration": {
"jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);"
}
},
{
"additionalInfo": {
"layoutX": 347,
"layoutY": 149
},
"type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode",
"name": "Message Type Switch",
"debugMode": false,
"configuration": {
"version": 0
}
},
{
"additionalInfo": {
"layoutX": 821,
"layoutY": 72
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode",
"name": "Save Client Attributes",
"debugMode": false,
"configuration": {
"scope": "CLIENT_SCOPE"
}
},
{
"additionalInfo": {
"layoutX": 824,
"layoutY": 156
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "Save Timeseries",
"debugMode": false,
"configuration": {
"defaultTTL": 0
}
}
],
"connections": [
{
"fromIndex": 4,
"toIndex": 1,
"type": "RPC Request to Device"
},
{
"fromIndex": 4,
"toIndex": 3,
"type": "RPC Request from Device"
},
{
"fromIndex": 4,
"toIndex": 6,
"type": "Post telemetry"
},
{
"fromIndex": 4,
"toIndex": 5,
"type": "Post attributes"
},
{
"fromIndex": 4,
"toIndex": 2,
"type": "Other"
},
{
"fromIndex": 6,
"toIndex": 0,
"type": "Success"
}
],
"ruleChainConnections": null
}
}