Merge branch 'feature/edge' of github.com:volodymyr-babak/thingsboard into feature/edge

This commit is contained in:
Volodymyr Babak 2020-09-25 15:22:10 +03:00
commit ca587ae7cc
8 changed files with 880 additions and 2 deletions

View File

@ -0,0 +1,416 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.edge;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.DashboardInfo;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
import org.thingsboard.server.common.data.alarm.AlarmStatus;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.page.TimePageData;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.controller.AbstractControllerTest;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.edge.imitator.EdgeImitator;
import org.thingsboard.server.gen.edge.EdgeConfiguration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@Slf4j
abstract public class BaseEdgeTest extends AbstractControllerTest {
private Tenant savedTenant;
private TenantId tenantId;
private User tenantAdmin;
private EdgeImitator edgeImitator;
private Edge edge;
@Before
public void beforeTest() throws Exception {
loginSysAdmin();
Tenant tenant = new Tenant();
tenant.setTitle("My tenant");
savedTenant = doPost("/api/tenant", tenant, Tenant.class);
tenantId = savedTenant.getId();
Assert.assertNotNull(savedTenant);
tenantAdmin = new User();
tenantAdmin.setAuthority(Authority.TENANT_ADMIN);
tenantAdmin.setTenantId(savedTenant.getId());
tenantAdmin.setEmail("tenant2@thingsboard.org");
tenantAdmin.setFirstName("Joe");
tenantAdmin.setLastName("Downs");
tenantAdmin = createUserAndLogin(tenantAdmin, "testPassword1");
installation();
edgeImitator = new EdgeImitator("localhost", 7070, edge.getRoutingKey(), edge.getSecret());
// should be 3, but 3 events from sync service + 3 from controller. will be fixed in next releases
edgeImitator.getStorage().expectMessageAmount(6);
edgeImitator.connect();
}
@After
public void afterTest() throws Exception {
edgeImitator.disconnect();
uninstallation();
loginSysAdmin();
doDelete("/api/tenant/" + savedTenant.getId().getId().toString())
.andExpect(status().isOk());
}
@Test
public void test() throws Exception {
testReceivedInitialData();
testDevices();
testAssets();
testRuleChains();
testDashboards();
testRelations();
testAlarms();
}
private void testReceivedInitialData() throws Exception {
log.info("Checking received data");
edgeImitator.getStorage().waitForMessages();
EdgeConfiguration configuration = edgeImitator.getStorage().getConfiguration();
Assert.assertNotNull(configuration);
Map<UUID, EntityType> entities = edgeImitator.getStorage().getEntities();
Assert.assertFalse(entities.isEmpty());
Set<UUID> devices = edgeImitator.getStorage().getEntitiesByType(EntityType.DEVICE);
Assert.assertEquals(1, devices.size());
TimePageData<Device> pageDataDevices = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/devices?",
new TypeReference<TimePageData<Device>>() {}, new TextPageLink(100));
for (Device device: pageDataDevices.getData()) {
Assert.assertTrue(devices.contains(device.getUuidId()));
}
Set<UUID> assets = edgeImitator.getStorage().getEntitiesByType(EntityType.ASSET);
Assert.assertEquals(1, assets.size());
TimePageData<Asset> pageDataAssets = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/assets?",
new TypeReference<TimePageData<Asset>>() {}, new TextPageLink(100));
for (Asset asset: pageDataAssets.getData()) {
Assert.assertTrue(assets.contains(asset.getUuidId()));
}
Set<UUID> ruleChains = edgeImitator.getStorage().getEntitiesByType(EntityType.RULE_CHAIN);
Assert.assertEquals(1, ruleChains.size());
TimePageData<RuleChain> pageDataRuleChains = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/ruleChains?",
new TypeReference<TimePageData<RuleChain>>() {}, new TextPageLink(100));
for (RuleChain ruleChain: pageDataRuleChains.getData()) {
Assert.assertTrue(ruleChains.contains(ruleChain.getUuidId()));
}
log.info("Received data checked");
}
private void testDevices() throws Exception {
log.info("Testing devices");
Device device = new Device();
device.setName("Edge Device 2");
device.setType("test");
Device savedDevice = doPost("/api/device", device, Device.class);
edgeImitator.getStorage().expectMessageAmount(1);
doPost("/api/edge/" + edge.getId().getId().toString()
+ "/device/" + savedDevice.getId().getId().toString(), Device.class);
TimePageData<Device> pageDataDevices = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/devices?",
new TypeReference<TimePageData<Device>>() {}, new TextPageLink(100));
Assert.assertTrue(pageDataDevices.getData().contains(savedDevice));
edgeImitator.getStorage().waitForMessages();
Set<UUID> devices = edgeImitator.getStorage().getEntitiesByType(EntityType.DEVICE);
Assert.assertEquals(2, devices.size());
Assert.assertTrue(devices.contains(savedDevice.getUuidId()));
edgeImitator.getStorage().expectMessageAmount(1);
doDelete("/api/edge/" + edge.getId().getId().toString()
+ "/device/" + savedDevice.getId().getId().toString(), Device.class);
pageDataDevices = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/devices?",
new TypeReference<TimePageData<Device>>() {}, new TextPageLink(100));
Assert.assertFalse(pageDataDevices.getData().contains(savedDevice));
edgeImitator.getStorage().waitForMessages();
devices = edgeImitator.getStorage().getEntitiesByType(EntityType.DEVICE);
Assert.assertEquals(1, devices.size());
Assert.assertFalse(devices.contains(savedDevice.getUuidId()));
doDelete("/api/device/" + savedDevice.getId().getId().toString())
.andExpect(status().isOk());
log.info("Devices tested successfully");
}
private void testAssets() throws Exception {
log.info("Testing assets");
Asset asset = new Asset();
asset.setName("Edge Asset 2");
asset.setType("test");
Asset savedAsset = doPost("/api/asset", asset, Asset.class);
edgeImitator.getStorage().expectMessageAmount(1);
doPost("/api/edge/" + edge.getId().getId().toString()
+ "/asset/" + savedAsset.getId().getId().toString(), Asset.class);
TimePageData<Asset> pageDataAssets = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/assets?",
new TypeReference<TimePageData<Asset>>() {}, new TextPageLink(100));
Assert.assertTrue(pageDataAssets.getData().contains(savedAsset));
edgeImitator.getStorage().waitForMessages();
Set<UUID> assets = edgeImitator.getStorage().getEntitiesByType(EntityType.ASSET);
Assert.assertEquals(2, assets.size());
Assert.assertTrue(assets.contains(savedAsset.getUuidId()));
edgeImitator.getStorage().expectMessageAmount(1);
doDelete("/api/edge/" + edge.getId().getId().toString()
+ "/asset/" + savedAsset.getId().getId().toString(), Asset.class);
pageDataAssets = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/assets?",
new TypeReference<TimePageData<Asset>>() {}, new TextPageLink(100));
Assert.assertFalse(pageDataAssets.getData().contains(savedAsset));
edgeImitator.getStorage().waitForMessages();
assets = edgeImitator.getStorage().getEntitiesByType(EntityType.ASSET);
Assert.assertEquals(1, assets.size());
Assert.assertFalse(assets.contains(savedAsset.getUuidId()));
doDelete("/api/asset/" + savedAsset.getId().getId().toString())
.andExpect(status().isOk());
log.info("Assets tested successfully");
}
private void testRuleChains() throws Exception {
log.info("Testing RuleChains");
RuleChain ruleChain = new RuleChain();
ruleChain.setName("Edge Test Rule Chain");
ruleChain.setType(RuleChainType.EDGE);
RuleChain savedRuleChain = doPost("/api/ruleChain", ruleChain, RuleChain.class);
edgeImitator.getStorage().expectMessageAmount(1);
doPost("/api/edge/" + edge.getId().getId().toString()
+ "/ruleChain/" + savedRuleChain.getId().getId().toString(), RuleChain.class);
TimePageData<RuleChain> pageDataRuleChain = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/ruleChains?",
new TypeReference<TimePageData<RuleChain>>() {}, new TextPageLink(100));
Assert.assertTrue(pageDataRuleChain.getData().contains(savedRuleChain));
edgeImitator.getStorage().waitForMessages();
Set<UUID> ruleChains = edgeImitator.getStorage().getEntitiesByType(EntityType.RULE_CHAIN);
Assert.assertEquals(2, ruleChains.size());
Assert.assertTrue(ruleChains.contains(savedRuleChain.getUuidId()));
edgeImitator.getStorage().expectMessageAmount(1);
doDelete("/api/edge/" + edge.getId().getId().toString()
+ "/ruleChain/" + savedRuleChain.getId().getId().toString(), RuleChain.class);
pageDataRuleChain = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/ruleChains?",
new TypeReference<TimePageData<RuleChain>>() {}, new TextPageLink(100));
Assert.assertFalse(pageDataRuleChain.getData().contains(savedRuleChain));
edgeImitator.getStorage().waitForMessages();
ruleChains = edgeImitator.getStorage().getEntitiesByType(EntityType.RULE_CHAIN);
Assert.assertEquals(1, ruleChains.size());
Assert.assertFalse(ruleChains.contains(savedRuleChain.getUuidId()));
doDelete("/api/ruleChain/" + savedRuleChain.getId().getId().toString())
.andExpect(status().isOk());
log.info("RuleChains tested successfully");
}
private void testDashboards() throws Exception {
log.info("Testing Dashboards");
Dashboard dashboard = new Dashboard();
dashboard.setTitle("Edge Test Dashboard");
Dashboard savedDashboard = doPost("/api/dashboard", dashboard, Dashboard.class);
edgeImitator.getStorage().expectMessageAmount(1);
doPost("/api/edge/" + edge.getId().getId().toString()
+ "/dashboard/" + savedDashboard.getId().getId().toString(), Dashboard.class);
TimePageData<DashboardInfo> pageDataDashboard = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/dashboards?",
new TypeReference<TimePageData<DashboardInfo>>() {}, new TextPageLink(100));
Assert.assertTrue(pageDataDashboard.getData().stream().allMatch(dashboardInfo -> dashboardInfo.getUuidId().equals(savedDashboard.getUuidId())));
edgeImitator.getStorage().waitForMessages();
Set<UUID> dashboards = edgeImitator.getStorage().getEntitiesByType(EntityType.DASHBOARD);
Assert.assertEquals(1, dashboards.size());
Assert.assertTrue(dashboards.contains(savedDashboard.getUuidId()));
edgeImitator.getStorage().expectMessageAmount(1);
doDelete("/api/edge/" + edge.getId().getId().toString()
+ "/dashboard/" + savedDashboard.getId().getId().toString(), Dashboard.class);
pageDataDashboard = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/dashboards?",
new TypeReference<TimePageData<DashboardInfo>>() {}, new TextPageLink(100));
Assert.assertFalse(pageDataDashboard.getData().stream().anyMatch(dashboardInfo -> dashboardInfo.getUuidId().equals(savedDashboard.getUuidId())));
edgeImitator.getStorage().waitForMessages();
dashboards = edgeImitator.getStorage().getEntitiesByType(EntityType.DASHBOARD);
Assert.assertEquals(0, dashboards.size());
Assert.assertFalse(dashboards.contains(savedDashboard.getUuidId()));
doDelete("/api/dashboard/" + savedDashboard.getId().getId().toString())
.andExpect(status().isOk());
log.info("Dashboards tested successfully");
}
private void testRelations() throws Exception {
log.info("Testing Relations");
List<Device> edgeDevices = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/devices?",
new TypeReference<TimePageData<Device>>() {}, new TextPageLink(100)).getData();
List<Asset> edgeAssets = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/assets?",
new TypeReference<TimePageData<Asset>>() {}, new TextPageLink(100)).getData();
Assert.assertEquals(1, edgeDevices.size());
Assert.assertEquals(1, edgeAssets.size());
Device device = edgeDevices.get(0);
Asset asset = edgeAssets.get(0);
Assert.assertEquals("Edge Device 1", device.getName());
Assert.assertEquals("Edge Asset 1", asset.getName());
EntityRelation relation = new EntityRelation();
relation.setType("test");
relation.setFrom(device.getId());
relation.setTo(asset.getId());
relation.setTypeGroup(RelationTypeGroup.COMMON);
edgeImitator.getStorage().expectMessageAmount(1);
doPost("/api/relation", relation);
edgeImitator.getStorage().waitForMessages();
List<EntityRelation> relations = edgeImitator.getStorage().getRelations();
Assert.assertEquals(1, relations.size());
Assert.assertTrue(relations.contains(relation));
edgeImitator.getStorage().expectMessageAmount(1);
doDelete("/api/relation?" +
"fromId=" + relation.getFrom().getId().toString() +
"&fromType=" + relation.getFrom().getEntityType().name() +
"&relationType=" + relation.getType() +
"&relationTypeGroup=" + relation.getTypeGroup().name() +
"&toId=" + relation.getTo().getId().toString() +
"&toType=" + relation.getTo().getEntityType().name())
.andExpect(status().isOk());
edgeImitator.getStorage().waitForMessages();
relations = edgeImitator.getStorage().getRelations();
Assert.assertEquals(0, relations.size());
Assert.assertFalse(relations.contains(relation));
log.info("Relations tested successfully");
}
private void testAlarms() throws Exception {
log.info("Testing Alarms");
List<Device> edgeDevices = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/devices?",
new TypeReference<TimePageData<Device>>() {}, new TextPageLink(100)).getData();
Assert.assertEquals(1, edgeDevices.size());
Device device = edgeDevices.get(0);
Assert.assertEquals("Edge Device 1", device.getName());
Alarm alarm = new Alarm();
alarm.setOriginator(device.getId());
alarm.setStatus(AlarmStatus.ACTIVE_UNACK);
alarm.setType("alarm");
alarm.setSeverity(AlarmSeverity.CRITICAL);
edgeImitator.getStorage().expectMessageAmount(1);
Alarm savedAlarm = doPost("/api/alarm", alarm, Alarm.class);
AlarmInfo alarmInfo = doGet("/api/alarm/info/" + savedAlarm.getId().getId().toString(), AlarmInfo.class);
edgeImitator.getStorage().waitForMessages();
Assert.assertEquals(1, edgeImitator.getStorage().getAlarms().size());
Assert.assertTrue(edgeImitator.getStorage().getAlarms().containsKey(alarmInfo.getType()));
Assert.assertEquals(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()), alarmInfo.getStatus());
edgeImitator.getStorage().expectMessageAmount(1);
doPost("/api/alarm/" + savedAlarm.getId().getId().toString() + "/ack");
edgeImitator.getStorage().waitForMessages();
alarmInfo = doGet("/api/alarm/info/" + savedAlarm.getId().getId().toString(), AlarmInfo.class);
Assert.assertTrue(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()).isAck());
Assert.assertEquals(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()), alarmInfo.getStatus());
edgeImitator.getStorage().expectMessageAmount(1);
doPost("/api/alarm/" + savedAlarm.getId().getId().toString() + "/clear");
edgeImitator.getStorage().waitForMessages();
alarmInfo = doGet("/api/alarm/info/" + savedAlarm.getId().getId().toString(), AlarmInfo.class);
Assert.assertTrue(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()).isAck());
Assert.assertTrue(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()).isCleared());
Assert.assertEquals(edgeImitator.getStorage().getAlarms().get(alarmInfo.getType()), alarmInfo.getStatus());
doDelete("/api/alarm/" + savedAlarm.getId().getId().toString())
.andExpect(status().isOk());
log.info("Alarms tested successfully");
}
private void installation() throws Exception {
edge = doPost("/api/edge", constructEdge("Test Edge", "test"), Edge.class);
Device device = new Device();
device.setName("Edge Device 1");
device.setType("test");
Device savedDevice = doPost("/api/device", device, Device.class);
doPost("/api/edge/" + edge.getId().getId().toString()
+ "/device/" + savedDevice.getId().getId().toString(), Device.class);
Asset asset = new Asset();
asset.setName("Edge Asset 1");
asset.setType("test");
Asset savedAsset = doPost("/api/asset", asset, Asset.class);
doPost("/api/edge/" + edge.getId().getId().toString()
+ "/asset/" + savedAsset.getId().getId().toString(), Asset.class);
}
private void uninstallation() throws Exception {
TimePageData<Device> pageDataDevices = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/devices?",
new TypeReference<TimePageData<Device>>() {}, new TextPageLink(100));
for (Device device: pageDataDevices.getData()) {
doDelete("/api/device/" + device.getId().getId().toString())
.andExpect(status().isOk());
}
TimePageData<Asset> pageDataAssets = doGetTypedWithPageLink("/api/edge/" + edge.getId().getId().toString() + "/assets?",
new TypeReference<TimePageData<Asset>>() {}, new TextPageLink(100));
for (Asset asset: pageDataAssets.getData()) {
doDelete("/api/asset/" + asset.getId().getId().toString())
.andExpect(status().isOk());
}
doDelete("/api/edge/" + edge.getId().getId().toString())
.andExpect(status().isOk());
}
}

View File

@ -0,0 +1,47 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.edge;
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.extensions.cpsuite.ClasspathSuite;
import org.junit.runner.RunWith;
import org.thingsboard.server.dao.CustomCassandraCQLUnit;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import java.util.Arrays;
@RunWith(ClasspathSuite.class)
@ClasspathSuite.ClassnameFilters({
"org.thingsboard.server.edge.nosql.*Test"})
public class EdgeNoSqlTestSuite {
@ClassRule
public static CustomCassandraCQLUnit cassandraUnit =
new CustomCassandraCQLUnit(
Arrays.asList(
new ClassPathCQLDataSet("cassandra/schema-ts.cql", false, false),
new ClassPathCQLDataSet("cassandra/schema-entities.cql", false, false),
new ClassPathCQLDataSet("cassandra/system-data.cql", false, false),
new ClassPathCQLDataSet("cassandra/system-test.cql", false, false)),
"cassandra-test.yaml", 30000l);
@BeforeClass
public static void cleanupInMemStorage(){
InMemoryStorage.getInstance().cleanup();
}
}

View File

@ -0,0 +1,41 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.edge;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.extensions.cpsuite.ClasspathSuite;
import org.junit.runner.RunWith;
import org.thingsboard.server.dao.CustomSqlUnit;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import java.util.Arrays;
@RunWith(ClasspathSuite.class)
@ClasspathSuite.ClassnameFilters({"org.thingsboard.server.edge.sql.*Test"})
public class EdgeSqlTestSuite {
@ClassRule
public static CustomSqlUnit sqlUnit = new CustomSqlUnit(
Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/schema-entities-idx.sql", "sql/system-data.sql"),
"sql/hsql/drop-all-tables.sql",
"sql-test.properties");
@BeforeClass
public static void cleanupInMemStorage(){
InMemoryStorage.getInstance().cleanup();
}
}

View File

@ -0,0 +1,148 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.edge.imitator;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.thingsboard.edge.rpc.EdgeGrpcClient;
import org.thingsboard.edge.rpc.EdgeRpcClient;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.gen.edge.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.AssetUpdateMsg;
import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.DownlinkResponseMsg;
import org.thingsboard.server.gen.edge.EdgeConfiguration;
import org.thingsboard.server.gen.edge.RelationUpdateMsg;
import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
import org.thingsboard.server.gen.edge.UplinkResponseMsg;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@Slf4j
public class EdgeImitator {
private String routingKey;
private String routingSecret;
private EdgeRpcClient edgeRpcClient;
@Getter
private EdgeStorage storage;
public EdgeImitator(String host, int port, String routingKey, String routingSecret) throws NoSuchFieldException, IllegalAccessException {
edgeRpcClient = new EdgeGrpcClient();
storage = new EdgeStorage();
this.routingKey = routingKey;
this.routingSecret = routingSecret;
setEdgeCredentials("rpcHost", host);
setEdgeCredentials("rpcPort", port);
}
private void setEdgeCredentials(String fieldName, Object value) throws NoSuchFieldException, IllegalAccessException {
Field fieldToSet = edgeRpcClient.getClass().getDeclaredField(fieldName);
fieldToSet.setAccessible(true);
fieldToSet.set(edgeRpcClient, value);
fieldToSet.setAccessible(false);
}
public void connect() {
edgeRpcClient.connect(routingKey, routingSecret,
this::onUplinkResponse,
this::onEdgeUpdate,
this::onDownlink,
this::onClose);
}
public void disconnect() throws InterruptedException {
edgeRpcClient.disconnect();
}
private void onUplinkResponse(UplinkResponseMsg msg) {
log.info("onUplinkResponse: {}", msg);
}
private void onEdgeUpdate(EdgeConfiguration edgeConfiguration) {
storage.setConfiguration(edgeConfiguration);
}
private void onDownlink(DownlinkMsg downlinkMsg) {
ListenableFuture<List<Void>> future = processDownlinkMsg(downlinkMsg);
Futures.addCallback(future, new FutureCallback<List<Void>>() {
@Override
public void onSuccess(@Nullable List<Void> result) {
DownlinkResponseMsg downlinkResponseMsg = DownlinkResponseMsg.newBuilder().setSuccess(true).build();
edgeRpcClient.sendDownlinkResponseMsg(downlinkResponseMsg);
}
@Override
public void onFailure(Throwable t) {
DownlinkResponseMsg downlinkResponseMsg = DownlinkResponseMsg.newBuilder().setSuccess(false).setErrorMsg(t.getMessage()).build();
edgeRpcClient.sendDownlinkResponseMsg(downlinkResponseMsg);
}
}, MoreExecutors.directExecutor());
}
private void onClose(Exception e) {
log.info("onClose: {}", e.getMessage());
}
private ListenableFuture<List<Void>> processDownlinkMsg(DownlinkMsg downlinkMsg) {
List<ListenableFuture<Void>> result = new ArrayList<>();
if (downlinkMsg.getDeviceUpdateMsgList() != null && !downlinkMsg.getDeviceUpdateMsgList().isEmpty()) {
for (DeviceUpdateMsg deviceUpdateMsg: downlinkMsg.getDeviceUpdateMsgList()) {
result.add(storage.processEntity(deviceUpdateMsg.getMsgType(), EntityType.DEVICE, new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())));
}
}
if (downlinkMsg.getAssetUpdateMsgList() != null && !downlinkMsg.getAssetUpdateMsgList().isEmpty()) {
for (AssetUpdateMsg assetUpdateMsg: downlinkMsg.getAssetUpdateMsgList()) {
result.add(storage.processEntity(assetUpdateMsg.getMsgType(), EntityType.ASSET, new UUID(assetUpdateMsg.getIdMSB(), assetUpdateMsg.getIdLSB())));
}
}
if (downlinkMsg.getRuleChainUpdateMsgList() != null && !downlinkMsg.getRuleChainUpdateMsgList().isEmpty()) {
for (RuleChainUpdateMsg ruleChainUpdateMsg: downlinkMsg.getRuleChainUpdateMsgList()) {
result.add(storage.processEntity(ruleChainUpdateMsg.getMsgType(), EntityType.RULE_CHAIN, new UUID(ruleChainUpdateMsg.getIdMSB(), ruleChainUpdateMsg.getIdLSB())));
}
}
if (downlinkMsg.getDashboardUpdateMsgList() != null && !downlinkMsg.getDashboardUpdateMsgList().isEmpty()) {
for (DashboardUpdateMsg dashboardUpdateMsg: downlinkMsg.getDashboardUpdateMsgList()) {
result.add(storage.processEntity(dashboardUpdateMsg.getMsgType(), EntityType.DASHBOARD, new UUID(dashboardUpdateMsg.getIdMSB(), dashboardUpdateMsg.getIdLSB())));
}
}
if (downlinkMsg.getRelationUpdateMsgList() != null && !downlinkMsg.getRelationUpdateMsgList().isEmpty()) {
for (RelationUpdateMsg relationUpdateMsg: downlinkMsg.getRelationUpdateMsgList()) {
result.add(storage.processRelation(relationUpdateMsg));
}
}
if (downlinkMsg.getAlarmUpdateMsgList() != null && !downlinkMsg.getAlarmUpdateMsgList().isEmpty()) {
for (AlarmUpdateMsg alarmUpdateMsg: downlinkMsg.getAlarmUpdateMsgList()) {
result.add(storage.processAlarm(alarmUpdateMsg));
}
}
return Futures.allAsList(result);
}
}

View File

@ -0,0 +1,133 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.edge.imitator;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.alarm.AlarmStatus;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.gen.edge.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.EdgeConfiguration;
import org.thingsboard.server.gen.edge.RelationUpdateMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
@Getter
@Setter
public class EdgeStorage {
private EdgeConfiguration configuration;
private CountDownLatch latch;
private Map<UUID, EntityType> entities;
private Map<String, AlarmStatus> alarms;
private List<EntityRelation> relations;
public EdgeStorage() {
latch = new CountDownLatch(0);
entities = new HashMap<>();
alarms = new HashMap<>();
relations = new ArrayList<>();
}
public ListenableFuture<Void> processEntity(UpdateMsgType msgType, EntityType type, UUID uuid) {
switch (msgType) {
case ENTITY_CREATED_RPC_MESSAGE:
case ENTITY_UPDATED_RPC_MESSAGE:
entities.put(uuid, type);
latch.countDown();
break;
case ENTITY_DELETED_RPC_MESSAGE:
if (entities.remove(uuid) != null) {
latch.countDown();
}
break;
}
return Futures.immediateFuture(null);
}
public ListenableFuture<Void> processRelation(RelationUpdateMsg relationMsg) {
boolean result = false;
EntityRelation relation = new EntityRelation();
relation.setType(relationMsg.getType());
relation.setTypeGroup(RelationTypeGroup.valueOf(relationMsg.getTypeGroup()));
relation.setTo(EntityIdFactory.getByTypeAndUuid(relationMsg.getToEntityType(), new UUID(relationMsg.getToIdMSB(), relationMsg.getToIdLSB())));
relation.setFrom(EntityIdFactory.getByTypeAndUuid(relationMsg.getFromEntityType(), new UUID(relationMsg.getFromIdMSB(), relationMsg.getFromIdLSB())));
switch (relationMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE:
case ENTITY_UPDATED_RPC_MESSAGE:
result = relations.add(relation);
break;
case ENTITY_DELETED_RPC_MESSAGE:
result = relations.remove(relation);
break;
}
if (result) {
latch.countDown();
}
return Futures.immediateFuture(null);
}
public ListenableFuture<Void> processAlarm(AlarmUpdateMsg alarmMsg) {
switch (alarmMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE:
case ENTITY_UPDATED_RPC_MESSAGE:
case ALARM_ACK_RPC_MESSAGE:
case ALARM_CLEAR_RPC_MESSAGE:
alarms.put(alarmMsg.getType(), AlarmStatus.valueOf(alarmMsg.getStatus()));
latch.countDown();
break;
case ENTITY_DELETED_RPC_MESSAGE:
if (alarms.remove(alarmMsg.getName()) != null) {
latch.countDown();
}
break;
}
return Futures.immediateFuture(null);
}
public Set<UUID> getEntitiesByType(EntityType type) {
return entities.entrySet().stream()
.filter(entry -> entry.getValue().equals(type))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)).keySet();
}
public void waitForMessages() throws InterruptedException {
latch.await(5, TimeUnit.SECONDS);
}
public void expectMessageAmount(int messageAmount) {
latch = new CountDownLatch(messageAmount);
}
}

View File

@ -0,0 +1,23 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.edge.nosql;
import org.thingsboard.server.dao.service.DaoNoSqlTest;
import org.thingsboard.server.edge.BaseEdgeTest;
@DaoNoSqlTest
public class EdgeNoSqlTest extends BaseEdgeTest {
}

View File

@ -0,0 +1,23 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.edge.sql;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.edge.BaseEdgeTest;
@DaoSqlTest
public class EdgeSqlTest extends BaseEdgeTest {
}

View File

@ -2029,9 +2029,9 @@ public class RestClient implements ClientHttpRequestInterceptor, Closeable {
}
}
public Optional<RuleChain> setRootRuleChain(RuleChainId ruleChainId, EdgeId edgeId) {
public Optional<Edge> setRootRuleChain(EdgeId edgeId, RuleChainId ruleChainId) {
try {
ResponseEntity<RuleChain> ruleChain = restTemplate.postForEntity(baseURL + "/api/edge/{edgeId}/{ruleChainId}/root", null, RuleChain.class, edgeId.getId(), ruleChainId.getId());
ResponseEntity<Edge> ruleChain = restTemplate.postForEntity(baseURL + "/api/edge/{edgeId}/{ruleChainId}/root", null, Edge.class, edgeId.getId(), ruleChainId.getId());
return Optional.ofNullable(ruleChain.getBody());
} catch (HttpClientErrorException exception) {
if (exception.getStatusCode() == HttpStatus.NOT_FOUND) {
@ -2251,6 +2251,53 @@ public class RestClient implements ClientHttpRequestInterceptor, Closeable {
}, params).getBody();
}
public Optional<RuleChain> addDefaultEdgeRuleChain(RuleChainId ruleChainId) {
try {
ResponseEntity<RuleChain> ruleChain = restTemplate.postForEntity(baseURL + "/api/ruleChain/{ruleChainId}/defaultEdge", null, RuleChain.class, ruleChainId.getId());
return Optional.ofNullable(ruleChain.getBody());
} catch (HttpClientErrorException exception) {
if (exception.getStatusCode() == HttpStatus.NOT_FOUND) {
return Optional.empty();
} else {
throw exception;
}
}
}
public Optional<RuleChain> removeDefaultEdgeRuleChain(RuleChainId ruleChainId) {
try {
ResponseEntity<RuleChain> ruleChain = restTemplate.exchange(baseURL + "/api/ruleChain/{ruleChainId}/defaultEdge", HttpMethod.DELETE, HttpEntity.EMPTY, RuleChain.class, ruleChainId.getId());
return Optional.ofNullable(ruleChain.getBody());
} catch (HttpClientErrorException exception) {
if (exception.getStatusCode() == HttpStatus.NOT_FOUND) {
return Optional.empty();
} else {
throw exception;
}
}
}
public List<RuleChain> getDefaultEdgeRuleChains() {
return restTemplate.exchange(baseURL + "/api/ruleChain/defaultEdgeRuleChains",
HttpMethod.GET,
HttpEntity.EMPTY,
new ParameterizedTypeReference<List<RuleChain>>() {
}).getBody();
}
public Optional<RuleChain> setDefaultRootEdgeRuleChain(RuleChainId ruleChainId) {
try {
ResponseEntity<RuleChain> ruleChain = restTemplate.postForEntity(baseURL + "/api/ruleChain/{ruleChainId}/defaultRootEdge", null, RuleChain.class, ruleChainId.getId());
return Optional.ofNullable(ruleChain.getBody());
} catch (HttpClientErrorException exception) {
if (exception.getStatusCode() == HttpStatus.NOT_FOUND) {
return Optional.empty();
} else {
throw exception;
}
}
}
public TextPageData<Edge> getTenantEdges(String type, TextPageLink pageLink) {
Map<String, String> params = new HashMap<>();
params.put("type", type);