Merge branch 'master' of github.com:thingsboard/thingsboard

This commit is contained in:
Andrii Shvaika 2020-10-08 10:57:58 +03:00
commit 0cc52cd129
13 changed files with 114 additions and 52 deletions

View File

@ -47,6 +47,7 @@ import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
import java.util.List;
import java.util.Optional;
@ -116,6 +117,9 @@ public class TenantActor extends RuleChainManagerActor {
if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) {
QueueToRuleEngineMsg queueMsg = (QueueToRuleEngineMsg) msg;
queueMsg.getTbMsg().getCallback().onSuccess();
} else if (msg.getMsgType().equals(MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG)){
TransportToDeviceActorMsgWrapper transportMsg = (TransportToDeviceActorMsgWrapper) msg;
transportMsg.getCallback().onSuccess();
}
return true;
}

View File

@ -42,6 +42,7 @@ import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -213,4 +214,33 @@ public abstract class AbstractMqttIntegrationTest extends AbstractControllerTest
return builder.build();
}
protected <T> T doExecuteWithRetriesAndInterval(SupplierWithThrowable<T> supplier, int retries, int intervalMs) throws Exception {
int count = 0;
T result = null;
Throwable lastException = null;
while (count < retries) {
try {
result = supplier.get();
if (result != null) {
return result;
}
} catch (Throwable e) {
lastException = e;
}
count++;
if (count < retries) {
Thread.sleep(intervalMs);
}
}
if (lastException != null) {
throw new RuntimeException(lastException);
} else {
return result;
}
}
@FunctionalInterface
public interface SupplierWithThrowable<T> {
T get() throws Throwable;
}
}

View File

@ -84,10 +84,14 @@ public abstract class AbstractMqttAttributesRequestIntegrationTest extends Abstr
postGatewayDeviceClientAttributes(client);
Device savedDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + "Gateway Device Request Attributes", Device.class),
20,
100);
assertNotNull(savedDevice);
Thread.sleep(1000);
Device savedDevice = doGet("/api/tenant/devices?deviceName=" + "Gateway Device Request Attributes", Device.class);
assertNotNull(savedDevice);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
Thread.sleep(1000);

View File

@ -84,7 +84,7 @@ public abstract class AbstractMqttAttributesUpdatesIntegrationTest extends Abstr
client.subscribe(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE.value());
Thread.sleep(2000);
Thread.sleep(1000);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
onUpdateCallback.getLatch().await(3, TimeUnit.SECONDS);
@ -127,14 +127,15 @@ public abstract class AbstractMqttAttributesUpdatesIntegrationTest extends Abstr
publishMqttMsg(client, connectPayloadBytes, MqttTopics.GATEWAY_CONNECT_TOPIC);
Thread.sleep(1000);
Device savedDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + "Gateway Device Subscribe to attribute updates", Device.class),
20,
100);
Device savedDevice = doGet("/api/tenant/devices?deviceName=" + "Gateway Device Subscribe to attribute updates", Device.class);
assertNotNull(savedDevice);
client.subscribe(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE.value());
Thread.sleep(2000);
Thread.sleep(1000);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
onUpdateCallback.getLatch().await(3, TimeUnit.SECONDS);

View File

@ -75,25 +75,21 @@ public abstract class AbstractMqttClaimDeviceTest extends AbstractMqttIntegratio
}
@Test
@Ignore
public void testClaimingDevice() throws Exception {
processTestClaimingDevice(false);
}
@Test
@Ignore
public void testClaimingDeviceWithoutSecretAndDuration() throws Exception {
processTestClaimingDevice(true);
}
@Test
@Ignore
public void testGatewayClaimingDevice() throws Exception {
processTestGatewayClaimingDevice("Test claiming gateway device", false);
}
@Test
@Ignore
public void testGatewayClaimingDeviceWithoutSecretAndDuration() throws Exception {
processTestGatewayClaimingDevice("Test claiming gateway device empty payload", true);
}
@ -116,8 +112,6 @@ public abstract class AbstractMqttClaimDeviceTest extends AbstractMqttIntegratio
protected void validateClaimResponse(boolean emptyPayload, MqttAsyncClient client, byte[] payloadBytes, byte[] failurePayloadBytes) throws Exception {
client.publish(MqttTopics.DEVICE_CLAIM_TOPIC, new MqttMessage(failurePayloadBytes));
Thread.sleep(2000);
loginUser(customerAdmin.getName(), CUSTOMER_USER_PASSWORD);
ClaimRequest claimRequest;
if (!emptyPayload) {
@ -126,14 +120,21 @@ public abstract class AbstractMqttClaimDeviceTest extends AbstractMqttIntegratio
claimRequest = new ClaimRequest(null);
}
ClaimResponse claimResponse = doPostClaimAsync("/api/customer/device/" + savedDevice.getName() + "/claim", claimRequest, ClaimResponse.class, status().isBadRequest());
ClaimResponse claimResponse = doExecuteWithRetriesAndInterval(
() -> doPostClaimAsync("/api/customer/device/" + savedDevice.getName() + "/claim", claimRequest, ClaimResponse.class, status().isBadRequest()),
20,
100
);
assertEquals(claimResponse, ClaimResponse.FAILURE);
client.publish(MqttTopics.DEVICE_CLAIM_TOPIC, new MqttMessage(payloadBytes));
Thread.sleep(2000);
ClaimResult claimResult = doPostClaimAsync("/api/customer/device/" + savedDevice.getName() + "/claim", claimRequest, ClaimResult.class, status().isOk());
ClaimResult claimResult = doExecuteWithRetriesAndInterval(
() -> doPostClaimAsync("/api/customer/device/" + savedDevice.getName() + "/claim", claimRequest, ClaimResult.class, status().isOk()),
20,
100
);
assertEquals(claimResult.getResponse(), ClaimResponse.SUCCESS);
Device claimedDevice = claimResult.getDevice();
assertNotNull(claimedDevice);
@ -147,9 +148,12 @@ public abstract class AbstractMqttClaimDeviceTest extends AbstractMqttIntegratio
protected void validateGatewayClaimResponse(String deviceName, boolean emptyPayload, MqttAsyncClient client, byte[] failurePayloadBytes, byte[] payloadBytes) throws Exception {
client.publish(MqttTopics.GATEWAY_CLAIM_TOPIC, new MqttMessage(failurePayloadBytes));
Thread.sleep(2000);
Device savedDevice = doExecuteWithRetriesAndInterval(
() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class),
20,
100
);
Device savedDevice = doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class);
assertNotNull(savedDevice);
loginUser(customerAdmin.getName(), CUSTOMER_USER_PASSWORD);
@ -165,9 +169,12 @@ public abstract class AbstractMqttClaimDeviceTest extends AbstractMqttIntegratio
client.publish(MqttTopics.GATEWAY_CLAIM_TOPIC, new MqttMessage(payloadBytes));
Thread.sleep(2000);
ClaimResult claimResult = doExecuteWithRetriesAndInterval(
() -> doPostClaimAsync("/api/customer/device/" + deviceName + "/claim", claimRequest, ClaimResult.class, status().isOk()),
20,
100
);
ClaimResult claimResult = doPostClaimAsync("/api/customer/device/" + deviceName + "/claim", claimRequest, ClaimResult.class, status().isOk());
assertEquals(claimResult.getResponse(), ClaimResponse.SUCCESS);
Device claimedDevice = claimResult.getDevice();
assertNotNull(claimedDevice);

View File

@ -52,7 +52,6 @@ public abstract class AbstractMqttClaimJsonDeviceTest extends AbstractMqttClaimD
}
@Test
@Ignore
public void testGatewayClaimingDeviceWithoutSecretAndDuration() throws Exception {
processTestGatewayClaimingDevice("Test claiming gateway device empty payload Json", true);
}

View File

@ -37,25 +37,21 @@ public abstract class AbstractMqttClaimProtoDeviceTest extends AbstractMqttClaim
public void afterTest() throws Exception { super.afterTest(); }
@Test
@Ignore
public void testClaimingDevice() throws Exception {
processTestClaimingDevice(false);
}
@Test
@Ignore
public void testClaimingDeviceWithoutSecretAndDuration() throws Exception {
processTestClaimingDevice(true);
}
@Test
@Ignore
public void testGatewayClaimingDevice() throws Exception {
processTestGatewayClaimingDevice("Test claiming gateway device Proto", false);
}
@Test
@Ignore
public void testGatewayClaimingDeviceWithoutSecretAndDuration() throws Exception {
processTestGatewayClaimingDevice("Test claiming gateway device empty payload Proto", true);
}

View File

@ -84,7 +84,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, MqttQoS.AT_MOST_ONCE.value());
Thread.sleep(2000);
Thread.sleep(1000);
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
String deviceId = savedDevice.getId().getId().toString();
@ -109,7 +109,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
TestMqttCallback callback = new TestMqttCallback(client, latch);
client.setCallback(callback);
Thread.sleep(2000);
Thread.sleep(1000);
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}";
String deviceId = savedDevice.getId().getId().toString();
@ -132,9 +132,11 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
protected void validateOneWayRpcGatewayResponse(String deviceName, MqttAsyncClient client, byte[] payloadBytes) throws Exception {
publishMqttMsg(client, payloadBytes, MqttTopics.GATEWAY_CONNECT_TOPIC);
Thread.sleep(2000);
Device savedDevice = getDeviceByName(deviceName);
Device savedDevice = doExecuteWithRetriesAndInterval(
() -> getDeviceByName(deviceName),
20,
100
);
assertNotNull(savedDevice);
CountDownLatch latch = new CountDownLatch(1);
@ -143,7 +145,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
client.subscribe(MqttTopics.GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE.value());
Thread.sleep(2000);
Thread.sleep(1000);
String setGpioRequest = "{\"method\": \"toggle_gpio\", \"params\": {\"pin\":1}}";
String deviceId = savedDevice.getId().getId().toString();
@ -156,9 +158,11 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
protected void validateTwoWayRpcGateway(String deviceName, MqttAsyncClient client, byte[] payloadBytes) throws Exception {
publishMqttMsg(client, payloadBytes, MqttTopics.GATEWAY_CONNECT_TOPIC);
Thread.sleep(2000);
Device savedDevice = getDeviceByName(deviceName);
Device savedDevice = doExecuteWithRetriesAndInterval(
() -> getDeviceByName(deviceName),
20,
100
);
assertNotNull(savedDevice);
CountDownLatch latch = new CountDownLatch(1);
@ -167,7 +171,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
client.subscribe(MqttTopics.GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE.value());
Thread.sleep(2000);
Thread.sleep(1000);
String setGpioRequest = "{\"method\": \"toggle_gpio\", \"params\": {\"pin\":1}}";
String deviceId = savedDevice.getId().getId().toString();

View File

@ -75,7 +75,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
DeviceId deviceId = savedDevice.getId();
long start = System.currentTimeMillis();
long end = System.currentTimeMillis() + 2000;
long end = System.currentTimeMillis() + 5000;
List<String> actualKeys = null;
while (start <= end) {
@ -106,13 +106,20 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
publishMqttMsg(client, payload, MqttTopics.GATEWAY_ATTRIBUTES_TOPIC);
Thread.sleep(2000);
Device firstDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + firstDeviceName, Device.class),
20,
100);
Device firstDevice = doGet("/api/tenant/devices?deviceName=" + firstDeviceName, Device.class);
assertNotNull(firstDevice);
Device secondDevice = doGet("/api/tenant/devices?deviceName=" + secondDeviceName, Device.class);
Device secondDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + secondDeviceName, Device.class),
20,
100);
assertNotNull(secondDevice);
Thread.sleep(2000);
List<String> firstDeviceActualKeys = doGetAsync("/api/plugins/telemetry/DEVICE/" + firstDevice.getId() + "/keys/attributes/CLIENT_SCOPE", List.class);
Set<String> firstDeviceActualKeySet = new HashSet<>(firstDeviceActualKeys);

View File

@ -88,10 +88,12 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
publishMqttMsg(client, payload.getBytes(), MqttTopics.GATEWAY_CONNECT_TOPIC);
Thread.sleep(2000);
String deviceName = "Device A";
Device device = doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class);
Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class),
20,
100);
assertNotNull(device);
}
@ -102,7 +104,7 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
String deviceId = savedDevice.getId().getId().toString();
long start = System.currentTimeMillis();
long end = System.currentTimeMillis() + 2000;
long end = System.currentTimeMillis() + 5000;
List<String> actualKeys = null;
while (start <= end) {
@ -139,13 +141,20 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
publishMqttMsg(client, payload, topic);
Thread.sleep(2000);
Device firstDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + firstDeviceName, Device.class),
20,
100);
Device firstDevice = doGet("/api/tenant/devices?deviceName=" + firstDeviceName, Device.class);
assertNotNull(firstDevice);
Device secondDevice = doGet("/api/tenant/devices?deviceName=" + secondDeviceName, Device.class);
Device secondDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + secondDeviceName, Device.class),
20,
100);
assertNotNull(secondDevice);
Thread.sleep(2000);
List<String> firstDeviceActualKeys = doGetAsync("/api/plugins/telemetry/DEVICE/" + firstDevice.getId() + "/keys/timeseries", List.class);
Set<String> firstDeviceActualKeySet = new HashSet<>(firstDeviceActualKeys);

View File

@ -73,10 +73,10 @@ public abstract class AbstractMqttTimeseriesJsonIntegrationTest extends Abstract
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
publishMqttMsg(client, payload.getBytes(), MqttTopics.GATEWAY_CONNECT_TOPIC);
Thread.sleep(2000);
String deviceName = "Device A";
Device device = doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class);
Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class),
20,
100);
assertNotNull(device);
}
}

View File

@ -81,9 +81,10 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
publishMqttMsg(client, connectMsgProto.toByteArray(), MqttTopics.GATEWAY_CONNECT_TOPIC);
Thread.sleep(2000);
Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class),
20,
100);
Device device = doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class);
assertNotNull(device);
}

View File

@ -19,5 +19,5 @@ package org.thingsboard.server.common.data;
* @author Andrew Shvayka
*/
public enum EntityType {
TENANT, TENANT_PROFILE, CUSTOMER, USER, DASHBOARD, ASSET, DEVICE, DEVICE_PROFILE, ALARM, RULE_CHAIN, RULE_NODE, ENTITY_VIEW, WIDGETS_BUNDLE, WIDGET_TYPE
TENANT, CUSTOMER, USER, DASHBOARD, ASSET, DEVICE, ALARM, RULE_CHAIN, RULE_NODE, ENTITY_VIEW, WIDGETS_BUNDLE, WIDGET_TYPE, TENANT_PROFILE, DEVICE_PROFILE
}