added new dynamic-schema based tests for telemetry and attributes upload

This commit is contained in:
ShvaykaD 2021-06-14 12:53:13 +03:00 committed by Andrew Shvayka
parent a1336f3893
commit c4133fa41a
16 changed files with 678 additions and 101 deletions

View File

@ -634,6 +634,72 @@ public abstract class BaseDeviceProfileControllerTest extends AbstractController
dynamicMsgToJson(sampleMsgDescriptor, sampleMsgWithOneOfSubMessage.toByteArray()));
}
@Test
public void testSaveProtoDeviceProfileWithInvalidTelemetrySchemaTsField() throws Exception {
testSaveDeviceProfileWithInvalidProtoSchema("syntax =\"proto3\";\n" +
"\n" +
"package schemavalidation;\n" +
"\n" +
"message PostTelemetry {\n" +
" int64 ts = 1;\n" +
" Values values = 2;\n" +
" \n" +
" message Values {\n" +
" string key1 = 3;\n" +
" bool key2 = 4;\n" +
" double key3 = 5;\n" +
" int32 key4 = 6;\n" +
" JsonObject key5 = 7;\n" +
" }\n" +
" \n" +
" message JsonObject {\n" +
" optional int32 someNumber = 8;\n" +
" repeated int32 someArray = 9;\n" +
" NestedJsonObject someNestedObject = 10;\n" +
" message NestedJsonObject {\n" +
" optional string key = 11;\n" +
" }\n" +
" }\n" +
"}", "[Transport Configuration] invalid telemetry proto schema provided! Field 'ts' has invalid label. Field 'ts' should have optional keyword!");
}
@Test
public void testSaveProtoDeviceProfileWithInvalidTelemetrySchemaTsDateType() throws Exception {
testSaveDeviceProfileWithInvalidProtoSchema("syntax =\"proto3\";\n" +
"\n" +
"package schemavalidation;\n" +
"\n" +
"message PostTelemetry {\n" +
" optional int32 ts = 1;\n" +
" Values values = 2;\n" +
" \n" +
" message Values {\n" +
" string key1 = 3;\n" +
" bool key2 = 4;\n" +
" double key3 = 5;\n" +
" int32 key4 = 6;\n" +
" JsonObject key5 = 7;\n" +
" }\n" +
" \n" +
" message JsonObject {\n" +
" optional int32 someNumber = 8;\n" +
" }\n" +
"}", "[Transport Configuration] invalid telemetry proto schema provided! Field 'ts' has invalid data type. Only int64 type is supported!");
}
@Test
public void testSaveProtoDeviceProfileWithInvalidTelemetrySchemaValuesDateType() throws Exception {
testSaveDeviceProfileWithInvalidProtoSchema("syntax =\"proto3\";\n" +
"\n" +
"package schemavalidation;\n" +
"\n" +
"message PostTelemetry {\n" +
" optional int64 ts = 1;\n" +
" string values = 2;\n" +
" \n" +
"}", "[Transport Configuration] invalid telemetry proto schema provided! Field 'values' has invalid data type. Only message type is supported!");
}
private DeviceProfile testSaveDeviceProfileWithProtoPayloadType(String schema) throws Exception {
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = this.createProtoTransportPayloadConfiguration(schema, schema);
MqttDeviceProfileTransportConfiguration mqttDeviceProfileTransportConfiguration = this.createMqttDeviceProfileTransportConfiguration(protoTransportPayloadConfiguration);

View File

@ -42,18 +42,18 @@ public abstract class AbstractTransportIntegrationTest extends AbstractControlle
"package test;\n" +
"\n" +
"message PostTelemetry {\n" +
" string key1 = 1;\n" +
" bool key2 = 2;\n" +
" double key3 = 3;\n" +
" int32 key4 = 4;\n" +
" optional string key1 = 1;\n" +
" optional bool key2 = 2;\n" +
" optional double key3 = 3;\n" +
" optional int32 key4 = 4;\n" +
" JsonObject key5 = 5;\n" +
"\n" +
" message JsonObject {\n" +
" int32 someNumber = 6;\n" +
" optional int32 someNumber = 6;\n" +
" repeated int32 someArray = 7;\n" +
" NestedJsonObject someNestedObject = 8;\n" +
" optional NestedJsonObject someNestedObject = 8;\n" +
" message NestedJsonObject {\n" +
" string key = 9;\n" +
" optional string key = 9;\n" +
" }\n" +
" }\n" +
"}";
@ -63,18 +63,18 @@ public abstract class AbstractTransportIntegrationTest extends AbstractControlle
"package test;\n" +
"\n" +
"message PostAttributes {\n" +
" string key1 = 1;\n" +
" bool key2 = 2;\n" +
" double key3 = 3;\n" +
" int32 key4 = 4;\n" +
" optional string key1 = 1;\n" +
" optional bool key2 = 2;\n" +
" optional double key3 = 3;\n" +
" optional int32 key4 = 4;\n" +
" JsonObject key5 = 5;\n" +
"\n" +
" message JsonObject {\n" +
" int32 someNumber = 6;\n" +
" optional int32 someNumber = 6;\n" +
" repeated int32 someArray = 7;\n" +
" NestedJsonObject someNestedObject = 8;\n" +
" message NestedJsonObject {\n" +
" string key = 9;\n" +
" optional string key = 9;\n" +
" }\n" +
" }\n" +
"}";
@ -83,16 +83,16 @@ public abstract class AbstractTransportIntegrationTest extends AbstractControlle
"package rpc;\n" +
"\n" +
"message RpcResponseMsg {\n" +
" string payload = 1;\n" +
" optional string payload = 1;\n" +
"}";
protected static final String DEVICE_RPC_REQUEST_PROTO_SCHEMA = "syntax =\"proto3\";\n" +
"package rpc;\n" +
"\n" +
"message RpcRequestMsg {\n" +
" string method = 1;\n" +
" int32 requestId = 2;\n" +
" string params = 3;\n" +
" optional string method = 1;\n" +
" optional int32 requestId = 2;\n" +
" optional string params = 3;\n" +
"}";
protected Tenant savedTenant;

View File

@ -61,11 +61,14 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap
@Test
public void testPushAttributes() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processAttributesTest(expectedKeys, PAYLOAD_VALUES_STR.getBytes());
processJsonPayloadAttributesTest(expectedKeys, PAYLOAD_VALUES_STR.getBytes());
}
protected void processAttributesTest(List<String> expectedKeys, byte[] payload) throws Exception {
log.warn("[testPushAttributes] Device: {}, Transport type: {}", savedDevice.getName(), savedDevice.getType());
protected void processJsonPayloadAttributesTest(List<String> expectedKeys, byte[] payload) throws Exception {
processAttributesTest(expectedKeys, payload, false);
}
protected void processAttributesTest(List<String> expectedKeys, byte[] payload, boolean presenceFieldsTest) throws Exception {
CoapClient client = getCoapClient(FeatureType.ATTRIBUTES);
postAttributes(client, payload);
@ -94,7 +97,11 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap
String getAttributesValuesUrl = getAttributesValuesUrl(deviceId, actualKeySet);
List<Map<String, Object>> values = doGetAsyncTyped(getAttributesValuesUrl, new TypeReference<>() {});
assertAttributesValues(values, expectedKeySet);
if (presenceFieldsTest) {
assertAttributesProtoValues(values, actualKeySet);
} else {
assertAttributesValues(values, actualKeySet);
}
String deleteAttributesUrl = "/api/plugins/telemetry/DEVICE/" + deviceId + "/CLIENT_SCOPE?keys=" + String.join(",", actualKeySet);
doDelete(deleteAttributesUrl);
}
@ -108,11 +115,11 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap
}
@SuppressWarnings("unchecked")
protected void assertAttributesValues(List<Map<String, Object>> deviceValues, Set<String> expectedKeySet) throws JsonProcessingException {
protected void assertAttributesValues(List<Map<String, Object>> deviceValues, Set<String> keySet) throws JsonProcessingException {
for (Map<String, Object> map : deviceValues) {
String key = (String) map.get("key");
Object value = map.get("value");
assertTrue(expectedKeySet.contains(key));
assertTrue(keySet.contains(key));
switch (key) {
case "key1":
assertEquals("value1", value);
@ -138,6 +145,35 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap
}
}
private void assertAttributesProtoValues(List<Map<String, Object>> values, Set<String> keySet) {
for (Map<String, Object> map : values) {
String key = (String) map.get("key");
Object value = map.get("value");
assertTrue(keySet.contains(key));
switch (key) {
case "key1":
assertEquals("", value);
break;
case "key2":
assertEquals(false, value);
break;
case "key3":
assertEquals(0.0, value);
break;
case "key4":
assertEquals(0, value);
break;
case "key5":
assertNotNull(value);
assertEquals(2, ((LinkedHashMap) value).size());
assertEquals(Arrays.asList(1, 2, 3), ((LinkedHashMap) value).get("someArray"));
LinkedHashMap<String, String> someNestedObject = (LinkedHashMap) ((LinkedHashMap) value).get("someNestedObject");
assertEquals("value", someNestedObject.get("key"));
break;
}
}
}
private String getAttributesValuesUrl(DeviceId deviceId, Set<String> actualKeySet) {
return "/api/plugins/telemetry/DEVICE/" + deviceId + "/values/attributes/CLIENT_SCOPE?keys=" + String.join(",", actualKeySet);
}

View File

@ -32,7 +32,6 @@ import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadCo
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -48,7 +47,6 @@ public abstract class AbstractCoapAttributesProtoIntegrationTest extends Abstrac
@Test
public void testPushAttributes() throws Exception {
super.processBeforeTest("Test Post Attributes device Proto", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF);
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof CoapDeviceProfileTransportConfiguration);
CoapDeviceProfileTransportConfiguration coapTransportConfiguration = (CoapDeviceProfileTransportConfiguration) transportConfiguration;
@ -87,7 +85,50 @@ public abstract class AbstractCoapAttributesProtoIntegrationTest extends Abstrac
.setField(postAttributesMsgDescriptor.findFieldByName("key4"), 4)
.setField(postAttributesMsgDescriptor.findFieldByName("key5"), jsonObject)
.build();
processAttributesTest(expectedKeys, postAttributesMsg.toByteArray());
processAttributesTest(Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false);
}
@Test
public void testPushAttributesWithExplicitPresenceProtoKeys() throws Exception {
super.processBeforeTest("Test Post Attributes device Proto", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF);
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof CoapDeviceProfileTransportConfiguration);
CoapDeviceProfileTransportConfiguration coapTransportConfiguration = (CoapDeviceProfileTransportConfiguration) transportConfiguration;
CoapDeviceTypeConfiguration coapDeviceTypeConfiguration = coapTransportConfiguration.getCoapDeviceTypeConfiguration();
assertTrue(coapDeviceTypeConfiguration instanceof DefaultCoapDeviceTypeConfiguration);
DefaultCoapDeviceTypeConfiguration defaultCoapDeviceTypeConfiguration = (DefaultCoapDeviceTypeConfiguration) coapDeviceTypeConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = defaultCoapDeviceTypeConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_ATTRIBUTES_PROTO_SCHEMA);
DynamicSchema attributesSchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchemaFile, ProtoTransportPayloadConfiguration.ATTRIBUTES_PROTO_SCHEMA);
DynamicMessage.Builder nestedJsonObjectBuilder = attributesSchema.newMessageBuilder("PostAttributes.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
assertNotNull(nestedJsonObjectBuilderDescriptor);
DynamicMessage nestedJsonObject = nestedJsonObjectBuilder.setField(nestedJsonObjectBuilderDescriptor.findFieldByName("key"), "value").build();
DynamicMessage.Builder jsonObjectBuilder = attributesSchema.newMessageBuilder("PostAttributes.JsonObject");
Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType();
assertNotNull(jsonObjectBuilderDescriptor);
DynamicMessage jsonObject = jsonObjectBuilder
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3)
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNestedObject"), nestedJsonObject)
.build();
DynamicMessage.Builder postAttributesBuilder = attributesSchema.newMessageBuilder("PostAttributes");
Descriptors.Descriptor postAttributesMsgDescriptor = postAttributesBuilder.getDescriptorForType();
assertNotNull(postAttributesMsgDescriptor);
DynamicMessage postAttributesMsg = postAttributesBuilder
.setField(postAttributesMsgDescriptor.findFieldByName("key1"), "")
.setField(postAttributesMsgDescriptor.findFieldByName("key2"), false)
.setField(postAttributesMsgDescriptor.findFieldByName("key3"), 0.0)
.setField(postAttributesMsgDescriptor.findFieldByName("key4"), 0)
.setField(postAttributesMsgDescriptor.findFieldByName("key5"), jsonObject)
.build();
processAttributesTest(Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), true);
}
}

View File

@ -56,18 +56,21 @@ public abstract class AbstractCoapTimeseriesIntegrationTest extends AbstractCoap
@Test
public void testPushTelemetry() throws Exception {
processTestPostTelemetry(null, false);
processJsonPayloadTelemetryTest(PAYLOAD_VALUES_STR.getBytes(), false);
}
@Test
public void testPushTelemetryWithTs() throws Exception {
String payloadStr = "{\"ts\": 10000, \"values\": " + PAYLOAD_VALUES_STR + "}";
processTestPostTelemetry(payloadStr.getBytes(), true);
processJsonPayloadTelemetryTest(payloadStr.getBytes(), true);
}
protected void processTestPostTelemetry(byte[] payloadBytes, boolean withTs) throws Exception {
log.warn("[testPushTelemetry] Device: {}, Transport type: {}", savedDevice.getName(), savedDevice.getType());
protected void processJsonPayloadTelemetryTest(byte[] payloadBytes, boolean withTs) throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processTestPostTelemetry(payloadBytes, expectedKeys, withTs, false);
}
protected void processTestPostTelemetry(byte[] payloadBytes, List<String> expectedKeys, boolean withTs, boolean presenceFieldsTest) throws Exception {
CoapClient coapClient = getCoapClient(FeatureType.TELEMETRY);
postTelemetry(coapClient, payloadBytes);
@ -127,16 +130,22 @@ public abstract class AbstractCoapTimeseriesIntegrationTest extends AbstractCoap
}
assertNotNull(values);
if (withTs) {
assertTs(values, expectedKeys, 10000, 0);
if (presenceFieldsTest) {
if (withTs) {
assertTsForExplicitProtoFieldValues(values, expectedKeys, 10000, 0);
assertExplicitProtoFieldValuesWithTs(values);
} else {
assertExplicitProtoFieldValues(values);
}
} else {
if (withTs) {
assertTs(values, expectedKeys, 10000, 0);
}
assertValues(values, 0);
}
assertValues(values, 0);
}
private void postTelemetry(CoapClient client, byte[] payload) throws IOException, ConnectorException {
if (payload == null) {
payload = PAYLOAD_VALUES_STR.getBytes();
}
CoapResponse coapResponse = client.setTimeout((long) 60000).post(payload, MediaTypeRegistry.APPLICATION_JSON);
assertEquals(CoAP.ResponseCode.CREATED, coapResponse.getCode());
}
@ -174,5 +183,39 @@ public abstract class AbstractCoapTimeseriesIntegrationTest extends AbstractCoap
}
}
private void assertExplicitProtoFieldValues(Map<String, List<Map<String, Object>>> deviceValues) {
for (Map.Entry<String, List<Map<String, Object>>> entry : deviceValues.entrySet()) {
String key = entry.getKey();
List<Map<String, Object>> tsKv = entry.getValue();
String value = (String) tsKv.get(0).get("value");
switch (key) {
case "key1":
assertEquals("", value);
break;
case "key2":
assertEquals("false", value);
break;
case "key3":
assertEquals("0.0", value);
break;
case "key4":
assertEquals("0", value);
break;
case "key5":
assertEquals("{\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}", value);
break;
}
}
}
private void assertExplicitProtoFieldValuesWithTs(Map<String, List<Map<String, Object>>> deviceValues) {
assertEquals(1, deviceValues.size());
List<Map<String, Object>> tsKv = deviceValues.get("key5");
assertEquals("{\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}", tsKv.get(0).get("value"));
}
private void assertTsForExplicitProtoFieldValues(Map<String, List<Map<String, Object>>> deviceValues, List<String> expectedKeys, int ts, int arrayIndex) {
assertEquals(ts, deviceValues.get(expectedKeys.get(0)).get(arrayIndex).get("ts"));
}
}

View File

@ -32,6 +32,9 @@ import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportC
import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
import java.util.Arrays;
import java.util.Collections;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -84,7 +87,7 @@ public abstract class AbstractCoapTimeseriesProtoIntegrationTest extends Abstrac
.setField(postTelemetryMsgDescriptor.findFieldByName("key4"), 4)
.setField(postTelemetryMsgDescriptor.findFieldByName("key5"), jsonObject)
.build();
processTestPostTelemetry(postTelemetryMsg.toByteArray(), false);
processTestPostTelemetry(postTelemetryMsg.toByteArray(), Arrays.asList("key1", "key2", "key3", "key4", "key5"), false, false);
}
@Test
@ -94,23 +97,23 @@ public abstract class AbstractCoapTimeseriesProtoIntegrationTest extends Abstrac
"package test;\n" +
"\n" +
"message PostTelemetry {\n" +
" int64 ts = 1;\n" +
" optional int64 ts = 1;\n" +
" Values values = 2;\n" +
" \n" +
" message Values {\n" +
" string key1 = 3;\n" +
" bool key2 = 4;\n" +
" double key3 = 5;\n" +
" int32 key4 = 6;\n" +
" optional string key1 = 3;\n" +
" optional bool key2 = 4;\n" +
" optional double key3 = 5;\n" +
" optional int32 key4 = 6;\n" +
" JsonObject key5 = 7;\n" +
" }\n" +
" \n" +
" message JsonObject {\n" +
" int32 someNumber = 8;\n" +
" optional int32 someNumber = 8;\n" +
" repeated int32 someArray = 9;\n" +
" NestedJsonObject someNestedObject = 10;\n" +
" message NestedJsonObject {\n" +
" string key = 11;\n" +
" optional string key = 11;\n" +
" }\n" +
" }\n" +
"}";
@ -164,7 +167,126 @@ public abstract class AbstractCoapTimeseriesProtoIntegrationTest extends Abstrac
.setField(postTelemetryMsgDescriptor.findFieldByName("values"), valuesMsg)
.build();
processTestPostTelemetry(postTelemetryMsg.toByteArray(), true);
processTestPostTelemetry(postTelemetryMsg.toByteArray(), Arrays.asList("key1", "key2", "key3", "key4", "key5"), true, false);
}
@Test
public void testPushTelemetryWithExplicitPresenceProtoKeys() throws Exception {
super.processBeforeTest("Test Post Telemetry device proto payload", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF);
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof CoapDeviceProfileTransportConfiguration);
CoapDeviceProfileTransportConfiguration coapDeviceProfileTransportConfiguration = (CoapDeviceProfileTransportConfiguration) transportConfiguration;
CoapDeviceTypeConfiguration coapDeviceTypeConfiguration = coapDeviceProfileTransportConfiguration.getCoapDeviceTypeConfiguration();
assertTrue(coapDeviceTypeConfiguration instanceof DefaultCoapDeviceTypeConfiguration);
DefaultCoapDeviceTypeConfiguration defaultCoapDeviceTypeConfiguration = (DefaultCoapDeviceTypeConfiguration) coapDeviceTypeConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = defaultCoapDeviceTypeConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_TELEMETRY_PROTO_SCHEMA);
DynamicSchema telemetrySchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema");
DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
assertNotNull(nestedJsonObjectBuilderDescriptor);
DynamicMessage nestedJsonObject = nestedJsonObjectBuilder.setField(nestedJsonObjectBuilderDescriptor.findFieldByName("key"), "value").build();
DynamicMessage.Builder jsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject");
Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType();
assertNotNull(jsonObjectBuilderDescriptor);
DynamicMessage jsonObject = jsonObjectBuilder
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3)
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNestedObject"), nestedJsonObject)
.build();
DynamicMessage.Builder postTelemetryBuilder = telemetrySchema.newMessageBuilder("PostTelemetry");
Descriptors.Descriptor postTelemetryMsgDescriptor = postTelemetryBuilder.getDescriptorForType();
assertNotNull(postTelemetryMsgDescriptor);
DynamicMessage postTelemetryMsg = postTelemetryBuilder
.setField(postTelemetryMsgDescriptor.findFieldByName("key1"), "")
.setField(postTelemetryMsgDescriptor.findFieldByName("key2"), false)
.setField(postTelemetryMsgDescriptor.findFieldByName("key3"), 0.0)
.setField(postTelemetryMsgDescriptor.findFieldByName("key4"), 0)
.setField(postTelemetryMsgDescriptor.findFieldByName("key5"), jsonObject)
.build();
processTestPostTelemetry(postTelemetryMsg.toByteArray(), Arrays.asList("key1", "key2", "key3", "key4", "key5"), false, true);
}
@Test
public void testPushTelemetryWithTsAndNoPresenceFields() throws Exception {
String schemaStr = "syntax =\"proto3\";\n" +
"\n" +
"package test;\n" +
"\n" +
"message PostTelemetry {\n" +
" optional int64 ts = 1;\n" +
" Values values = 2;\n" +
" \n" +
" message Values {\n" +
" string key1 = 3;\n" +
" bool key2 = 4;\n" +
" double key3 = 5;\n" +
" int32 key4 = 6;\n" +
" JsonObject key5 = 7;\n" +
" }\n" +
" \n" +
" message JsonObject {\n" +
" optional int32 someNumber = 8;\n" +
" repeated int32 someArray = 9;\n" +
" NestedJsonObject someNestedObject = 10;\n" +
" message NestedJsonObject {\n" +
" optional string key = 11;\n" +
" }\n" +
" }\n" +
"}";
super.processBeforeTest("Test Post Telemetry device proto payload", CoapDeviceType.DEFAULT, TransportPayloadType.PROTOBUF, schemaStr, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof CoapDeviceProfileTransportConfiguration);
CoapDeviceProfileTransportConfiguration coapDeviceProfileTransportConfiguration = (CoapDeviceProfileTransportConfiguration) transportConfiguration;
CoapDeviceTypeConfiguration coapDeviceTypeConfiguration = coapDeviceProfileTransportConfiguration.getCoapDeviceTypeConfiguration();
assertTrue(coapDeviceTypeConfiguration instanceof DefaultCoapDeviceTypeConfiguration);
DefaultCoapDeviceTypeConfiguration defaultCoapDeviceTypeConfiguration = (DefaultCoapDeviceTypeConfiguration) coapDeviceTypeConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = defaultCoapDeviceTypeConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(schemaStr);
DynamicSchema telemetrySchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema");
DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
assertNotNull(nestedJsonObjectBuilderDescriptor);
DynamicMessage nestedJsonObject = nestedJsonObjectBuilder.setField(nestedJsonObjectBuilderDescriptor.findFieldByName("key"), "value").build();
DynamicMessage.Builder jsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject");
Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType();
assertNotNull(jsonObjectBuilderDescriptor);
DynamicMessage jsonObject = jsonObjectBuilder
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3)
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNestedObject"), nestedJsonObject)
.build();
DynamicMessage.Builder valuesBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.Values");
Descriptors.Descriptor valuesDescriptor = valuesBuilder.getDescriptorForType();
assertNotNull(valuesDescriptor);
DynamicMessage valuesMsg = valuesBuilder
.setField(valuesDescriptor.findFieldByName("key4"), 0)
.setField(valuesDescriptor.findFieldByName("key5"), jsonObject)
.build();
DynamicMessage.Builder postTelemetryBuilder = telemetrySchema.newMessageBuilder("PostTelemetry");
Descriptors.Descriptor postTelemetryMsgDescriptor = postTelemetryBuilder.getDescriptorForType();
assertNotNull(postTelemetryMsgDescriptor);
DynamicMessage postTelemetryMsg = postTelemetryBuilder
.setField(postTelemetryMsgDescriptor.findFieldByName("ts"), 10000L)
.setField(postTelemetryMsgDescriptor.findFieldByName("values"), valuesMsg)
.build();
processTestPostTelemetry(postTelemetryMsg.toByteArray(), Collections.singletonList("key5"), true, true);
}
}

View File

@ -55,13 +55,13 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
}
@Test
public void testPushMqttAttributes() throws Exception {
public void testPushAttributes() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
processJsonPayloadAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
}
@Test
public void testPushMqttAttributesGateway() throws Exception {
public void testPushAttributesGateway() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
String deviceName1 = "Device A";
String deviceName2 = "Device B";
@ -69,7 +69,11 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
processGatewayAttributesTest(expectedKeys, payload.getBytes(), deviceName1, deviceName2);
}
protected void processAttributesTest(String topic, List<String> expectedKeys, byte[] payload) throws Exception {
protected void processJsonPayloadAttributesTest(String topic, List<String> expectedKeys, byte[] payload) throws Exception {
processAttributesTest(topic, expectedKeys, payload, false);
}
protected void processAttributesTest(String topic, List<String> expectedKeys, byte[] payload, boolean presenceFieldsTest) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
publishMqttMsg(client, payload, topic);
@ -98,7 +102,11 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
String getAttributesValuesUrl = getAttributesValuesUrl(deviceId, actualKeySet);
List<Map<String, Object>> values = doGetAsyncTyped(getAttributesValuesUrl, new TypeReference<>() {});
assertAttributesValues(values, expectedKeySet);
if (presenceFieldsTest) {
assertAttributesProtoValues(values, actualKeySet);
} else {
assertAttributesValues(values, actualKeySet);
}
String deleteAttributesUrl = "/api/plugins/telemetry/DEVICE/" + deviceId + "/CLIENT_SCOPE?keys=" + String.join(",", actualKeySet);
doDelete(deleteAttributesUrl);
}
@ -145,11 +153,11 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
}
@SuppressWarnings("unchecked")
protected void assertAttributesValues(List<Map<String, Object>> deviceValues, Set<String> expectedKeySet) throws JsonProcessingException {
protected void assertAttributesValues(List<Map<String, Object>> deviceValues, Set<String> keySet) throws JsonProcessingException {
for (Map<String, Object> map : deviceValues) {
String key = (String) map.get("key");
Object value = map.get("value");
assertTrue(expectedKeySet.contains(key));
assertTrue(keySet.contains(key));
switch (key) {
case "key1":
assertEquals("value1", value);
@ -175,6 +183,35 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
}
}
private void assertAttributesProtoValues(List<Map<String, Object>> values, Set<String> keySet) {
for (Map<String, Object> map : values) {
String key = (String) map.get("key");
Object value = map.get("value");
assertTrue(keySet.contains(key));
switch (key) {
case "key1":
assertEquals("", value);
break;
case "key2":
assertEquals(false, value);
break;
case "key3":
assertEquals(0.0, value);
break;
case "key4":
assertEquals(0, value);
break;
case "key5":
assertNotNull(value);
assertEquals(2, ((LinkedHashMap) value).size());
assertEquals(Arrays.asList(1, 2, 3), ((LinkedHashMap) value).get("someArray"));
LinkedHashMap<String, String> someNestedObject = (LinkedHashMap) ((LinkedHashMap) value).get("someNestedObject");
assertEquals("value", someNestedObject.get("key"));
break;
}
}
}
protected String getGatewayAttributesJsonPayload(String deviceA, String deviceB) {
return "{\"" + deviceA + "\": " + PAYLOAD_VALUES_STR + ", \"" + deviceB + "\": " + PAYLOAD_VALUES_STR + "}";
}

View File

@ -40,13 +40,13 @@ public abstract class AbstractMqttAttributesJsonIntegrationTest extends Abstract
}
@Test
public void testPushMqttAttributes() throws Exception {
public void testPushAttributes() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
processJsonPayloadAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
}
@Test
public void testPushMqttAttributesGateway() throws Exception {
public void testPushAttributesGateway() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
String deviceName1 = "Device A";
String deviceName2 = "Device B";

View File

@ -47,9 +47,8 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac
}
@Test
public void testPushMqttAttributes() throws Exception {
public void testPushAttributes() throws Exception {
super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, POST_DATA_ATTRIBUTES_TOPIC);
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
@ -85,11 +84,51 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac
.setField(postAttributesMsgDescriptor.findFieldByName("key4"), 4)
.setField(postAttributesMsgDescriptor.findFieldByName("key5"), jsonObject)
.build();
processAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, expectedKeys, postAttributesMsg.toByteArray());
processAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false);
}
@Test
public void testPushMqttAttributesGateway() throws Exception {
public void testPushAttributesWithExplicitPresenceProtoKeys() throws Exception {
super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, POST_DATA_ATTRIBUTES_TOPIC);
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchemaFile = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_ATTRIBUTES_PROTO_SCHEMA);
DynamicSchema attributesSchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchemaFile, ProtoTransportPayloadConfiguration.ATTRIBUTES_PROTO_SCHEMA);
DynamicMessage.Builder nestedJsonObjectBuilder = attributesSchema.newMessageBuilder("PostAttributes.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
assertNotNull(nestedJsonObjectBuilderDescriptor);
DynamicMessage nestedJsonObject = nestedJsonObjectBuilder.setField(nestedJsonObjectBuilderDescriptor.findFieldByName("key"), "value").build();
DynamicMessage.Builder jsonObjectBuilder = attributesSchema.newMessageBuilder("PostAttributes.JsonObject");
Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType();
assertNotNull(jsonObjectBuilderDescriptor);
DynamicMessage jsonObject = jsonObjectBuilder
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3)
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNestedObject"), nestedJsonObject)
.build();
DynamicMessage.Builder postAttributesBuilder = attributesSchema.newMessageBuilder("PostAttributes");
Descriptors.Descriptor postAttributesMsgDescriptor = postAttributesBuilder.getDescriptorForType();
assertNotNull(postAttributesMsgDescriptor);
DynamicMessage postAttributesMsg = postAttributesBuilder
.setField(postAttributesMsgDescriptor.findFieldByName("key1"), "")
.setField(postAttributesMsgDescriptor.findFieldByName("key2"), false)
.setField(postAttributesMsgDescriptor.findFieldByName("key3"), 0.0)
.setField(postAttributesMsgDescriptor.findFieldByName("key4"), 0)
.setField(postAttributesMsgDescriptor.findFieldByName("key5"), jsonObject)
.build();
processAttributesTest(POST_DATA_ATTRIBUTES_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), true);
}
@Test
public void testPushAttributesGateway() throws Exception {
super.processBeforeTest("Test Post Attributes device", "Test Post Attributes gateway", TransportPayloadType.PROTOBUF, null, null);
TransportApiProtos.GatewayAttributesMsg.Builder gatewayAttributesMsgProtoBuilder = TransportApiProtos.GatewayAttributesMsg.newBuilder();
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");

View File

@ -61,20 +61,20 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
}
@Test
public void testPushMqttTelemetry() throws Exception {
public void testPushTelemetry() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processTelemetryTest(MqttTopics.DEVICE_TELEMETRY_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes(), false);
processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes(), false);
}
@Test
public void testPushMqttTelemetryWithTs() throws Exception {
public void testPushTelemetryWithTs() throws Exception {
String payloadStr = "{\"ts\": 10000, \"values\": " + PAYLOAD_VALUES_STR + "}";
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processTelemetryTest(MqttTopics.DEVICE_TELEMETRY_TOPIC, expectedKeys, payloadStr.getBytes(), true);
processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_TOPIC, expectedKeys, payloadStr.getBytes(), true);
}
@Test
public void testPushMqttTelemetryGateway() throws Exception {
public void testPushTelemetryGateway() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
String deviceName1 = "Device A";
String deviceName2 = "Device B";
@ -97,7 +97,11 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
assertNotNull(device);
}
protected void processTelemetryTest(String topic, List<String> expectedKeys, byte[] payload, boolean withTs) throws Exception {
protected void processJsonPayloadTelemetryTest(String topic, List<String> expectedKeys, byte[] payload, boolean withTs) throws Exception {
processTelemetryTest(topic, expectedKeys, payload, withTs, false);
}
protected void processTelemetryTest(String topic, List<String> expectedKeys, byte[] payload, boolean withTs, boolean presenceFieldsTest) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
publishMqttMsg(client, payload, topic);
@ -157,10 +161,19 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
}
assertNotNull(values);
if (withTs) {
assertTs(values, expectedKeys, 10000, 0);
if (presenceFieldsTest) {
if (withTs) {
assertTsForExplicitProtoFieldValues(values, expectedKeys, 10000, 0);
assertExplicitProtoFieldValuesWithTs(values);
} else {
assertExplicitProtoFieldValues(values);
}
} else {
if (withTs) {
assertTs(values, expectedKeys, 10000, 0);
}
assertValues(values, 0);
}
assertValues(values, 0);
}
protected void processGatewayTelemetryTest(String topic, List<String> expectedKeys, byte[] payload, String firstDeviceName, String secondDeviceName) throws Exception {
@ -254,6 +267,41 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
}
}
private void assertExplicitProtoFieldValues(Map<String, List<Map<String, Object>>> deviceValues) {
for (Map.Entry<String, List<Map<String, Object>>> entry : deviceValues.entrySet()) {
String key = entry.getKey();
List<Map<String, Object>> tsKv = entry.getValue();
String value = (String) tsKv.get(0).get("value");
switch (key) {
case "key1":
assertEquals("", value);
break;
case "key2":
assertEquals("false", value);
break;
case "key3":
assertEquals("0.0", value);
break;
case "key4":
assertEquals("0", value);
break;
case "key5":
assertEquals("{\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}", value);
break;
}
}
}
private void assertExplicitProtoFieldValuesWithTs(Map<String, List<Map<String, Object>>> deviceValues) {
assertEquals(1, deviceValues.size());
List<Map<String, Object>> tsKv = deviceValues.get("key5");
assertEquals("{\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}", tsKv.get(0).get("value"));
}
private void assertTsForExplicitProtoFieldValues(Map<String, List<Map<String, Object>>> deviceValues, List<String> expectedKeys, int ts, int arrayIndex) {
assertEquals(ts, deviceValues.get(expectedKeys.get(0)).get(arrayIndex).get("ts"));
}
private void assertTs(Map<String, List<Map<String, Object>>> deviceValues, List<String> expectedKeys, int ts, int arrayIndex) {
assertEquals(ts, deviceValues.get(expectedKeys.get(0)).get(arrayIndex).get("ts"));
assertEquals(ts, deviceValues.get(expectedKeys.get(1)).get(arrayIndex).get("ts"));

View File

@ -45,20 +45,20 @@ public abstract class AbstractMqttTimeseriesJsonIntegrationTest extends Abstract
}
@Test
public void testPushMqttTelemetry() throws Exception {
public void testPushTelemetry() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processTelemetryTest(POST_DATA_TELEMETRY_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes(), false);
processJsonPayloadTelemetryTest(POST_DATA_TELEMETRY_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes(), false);
}
@Test
public void testPushMqttTelemetryWithTs() throws Exception {
public void testPushTelemetryWithTs() throws Exception {
String payloadStr = "{\"ts\": 10000, \"values\": " + PAYLOAD_VALUES_STR + "}";
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processTelemetryTest(POST_DATA_TELEMETRY_TOPIC, expectedKeys, payloadStr.getBytes(), true);
processJsonPayloadTelemetryTest(POST_DATA_TELEMETRY_TOPIC, expectedKeys, payloadStr.getBytes(), true);
}
@Test
public void testPushMqttTelemetryGateway() throws Exception {
public void testPushTelemetryGateway() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
String deviceName1 = "Device A";
String deviceName2 = "Device B";

View File

@ -22,6 +22,7 @@ import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfileProvisionType;
@ -35,6 +36,7 @@ import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertNotNull;
@ -51,9 +53,8 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
}
@Test
public void testPushMqttTelemetry() throws Exception {
public void testPushTelemetry() throws Exception {
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null);
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
@ -89,38 +90,37 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
.setField(postTelemetryMsgDescriptor.findFieldByName("key4"), 4)
.setField(postTelemetryMsgDescriptor.findFieldByName("key5"), jsonObject)
.build();
processTelemetryTest(POST_DATA_TELEMETRY_TOPIC, expectedKeys, postTelemetryMsg.toByteArray(), false);
processTelemetryTest(POST_DATA_TELEMETRY_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), false, false);
}
@Test
public void testPushMqttTelemetryWithTs() throws Exception {
public void testPushTelemetryWithTs() throws Exception {
String schemaStr = "syntax =\"proto3\";\n" +
"\n" +
"package test;\n" +
"\n" +
"message PostTelemetry {\n" +
" int64 ts = 1;\n" +
" optional int64 ts = 1;\n" +
" Values values = 2;\n" +
" \n" +
" message Values {\n" +
" string key1 = 3;\n" +
" bool key2 = 4;\n" +
" double key3 = 5;\n" +
" int32 key4 = 6;\n" +
" optional string key1 = 3;\n" +
" optional bool key2 = 4;\n" +
" optional double key3 = 5;\n" +
" optional int32 key4 = 6;\n" +
" JsonObject key5 = 7;\n" +
" }\n" +
" \n" +
" message JsonObject {\n" +
" int32 someNumber = 8;\n" +
" optional int32 someNumber = 8;\n" +
" repeated int32 someArray = 9;\n" +
" NestedJsonObject someNestedObject = 10;\n" +
" message NestedJsonObject {\n" +
" string key = 11;\n" +
" optional string key = 11;\n" +
" }\n" +
" }\n" +
"}";
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null, schemaStr, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
@ -167,11 +167,124 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
.setField(postTelemetryMsgDescriptor.findFieldByName("values"), valuesMsg)
.build();
processTelemetryTest(POST_DATA_TELEMETRY_TOPIC, expectedKeys, postTelemetryMsg.toByteArray(), true);
processTelemetryTest(POST_DATA_TELEMETRY_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), true, false);
}
@Test
public void testPushMqttTelemetryGateway() throws Exception {
public void testPushTelemetryWithExplicitPresenceProtoKeys() throws Exception {
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null);
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(DEVICE_TELEMETRY_PROTO_SCHEMA);
DynamicSchema telemetrySchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema");
DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
assertNotNull(nestedJsonObjectBuilderDescriptor);
DynamicMessage nestedJsonObject = nestedJsonObjectBuilder.setField(nestedJsonObjectBuilderDescriptor.findFieldByName("key"), "value").build();
DynamicMessage.Builder jsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject");
Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType();
assertNotNull(jsonObjectBuilderDescriptor);
DynamicMessage jsonObject = jsonObjectBuilder
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3)
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNestedObject"), nestedJsonObject)
.build();
DynamicMessage.Builder postTelemetryBuilder = telemetrySchema.newMessageBuilder("PostTelemetry");
Descriptors.Descriptor postTelemetryMsgDescriptor = postTelemetryBuilder.getDescriptorForType();
assertNotNull(postTelemetryMsgDescriptor);
DynamicMessage postTelemetryMsg = postTelemetryBuilder
.setField(postTelemetryMsgDescriptor.findFieldByName("key1"), "")
.setField(postTelemetryMsgDescriptor.findFieldByName("key2"), false)
.setField(postTelemetryMsgDescriptor.findFieldByName("key3"), 0.0)
.setField(postTelemetryMsgDescriptor.findFieldByName("key4"), 0)
.setField(postTelemetryMsgDescriptor.findFieldByName("key5"), jsonObject)
.build();
processTelemetryTest(POST_DATA_TELEMETRY_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), false, true);
}
@Test
public void testPushTelemetryWithTsAndNoPresenceFields() throws Exception {
String schemaStr = "syntax =\"proto3\";\n" +
"\n" +
"package test;\n" +
"\n" +
"message PostTelemetry {\n" +
" optional int64 ts = 1;\n" +
" Values values = 2;\n" +
" \n" +
" message Values {\n" +
" string key1 = 3;\n" +
" bool key2 = 4;\n" +
" double key3 = 5;\n" +
" int32 key4 = 6;\n" +
" JsonObject key5 = 7;\n" +
" }\n" +
" \n" +
" message JsonObject {\n" +
" optional int32 someNumber = 8;\n" +
" repeated int32 someArray = 9;\n" +
" NestedJsonObject someNestedObject = 10;\n" +
" message NestedJsonObject {\n" +
" optional string key = 11;\n" +
" }\n" +
" }\n" +
"}";
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null, schemaStr, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration mqttTransportConfiguration = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
assertTrue(transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration);
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
ProtoFileElement transportProtoSchema = protoTransportPayloadConfiguration.getTransportProtoSchema(schemaStr);
DynamicSchema telemetrySchema = protoTransportPayloadConfiguration.getDynamicSchema(transportProtoSchema, "telemetrySchema");
DynamicMessage.Builder nestedJsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject.NestedJsonObject");
Descriptors.Descriptor nestedJsonObjectBuilderDescriptor = nestedJsonObjectBuilder.getDescriptorForType();
assertNotNull(nestedJsonObjectBuilderDescriptor);
DynamicMessage nestedJsonObject = nestedJsonObjectBuilder.setField(nestedJsonObjectBuilderDescriptor.findFieldByName("key"), "value").build();
DynamicMessage.Builder jsonObjectBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.JsonObject");
Descriptors.Descriptor jsonObjectBuilderDescriptor = jsonObjectBuilder.getDescriptorForType();
assertNotNull(jsonObjectBuilderDescriptor);
DynamicMessage jsonObject = jsonObjectBuilder
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 1)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 2)
.addRepeatedField(jsonObjectBuilderDescriptor.findFieldByName("someArray"), 3)
.setField(jsonObjectBuilderDescriptor.findFieldByName("someNestedObject"), nestedJsonObject)
.build();
DynamicMessage.Builder valuesBuilder = telemetrySchema.newMessageBuilder("PostTelemetry.Values");
Descriptors.Descriptor valuesDescriptor = valuesBuilder.getDescriptorForType();
assertNotNull(valuesDescriptor);
DynamicMessage valuesMsg = valuesBuilder
.setField(valuesDescriptor.findFieldByName("key4"), 0)
.setField(valuesDescriptor.findFieldByName("key5"), jsonObject)
.build();
DynamicMessage.Builder postTelemetryBuilder = telemetrySchema.newMessageBuilder("PostTelemetry");
Descriptors.Descriptor postTelemetryMsgDescriptor = postTelemetryBuilder.getDescriptorForType();
assertNotNull(postTelemetryMsgDescriptor);
DynamicMessage postTelemetryMsg = postTelemetryBuilder
.setField(postTelemetryMsgDescriptor.findFieldByName("ts"), 10000L)
.setField(postTelemetryMsgDescriptor.findFieldByName("values"), valuesMsg)
.build();
processTelemetryTest(POST_DATA_TELEMETRY_TOPIC, Collections.singletonList("key5"), postTelemetryMsg.toByteArray(), true, true);
}
@Test
public void testPushTelemetryGateway() throws Exception {
super.processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, null, null, null, null, null, null, null, null, DeviceProfileProvisionType.DISABLED);
TransportApiProtos.GatewayTelemetryMsg.Builder gatewayTelemetryMsgProtoBuilder = TransportApiProtos.GatewayTelemetryMsg.newBuilder();
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");

View File

@ -47,6 +47,7 @@ public class ProtoTransportPayloadConfiguration implements TransportPayloadTypeC
public static final String TELEMETRY_PROTO_SCHEMA = "telemetry proto schema";
public static final String RPC_RESPONSE_PROTO_SCHEMA = "rpc response proto schema";
public static final String RPC_REQUEST_PROTO_SCHEMA = "rpc request proto schema";
private static final String PROTO_3_SYNTAX = "proto3";
private String deviceTelemetryProtoSchema;
private String deviceAttributesProtoSchema;
@ -123,6 +124,7 @@ public class ProtoTransportPayloadConfiguration implements TransportPayloadTypeC
public DynamicSchema getDynamicSchema(ProtoFileElement protoFileElement, String schemaName) {
DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
schemaBuilder.setName(schemaName);
schemaBuilder.setSyntax(PROTO_3_SYNTAX);
schemaBuilder.setPackage(!isEmptyStr(protoFileElement.getPackageName()) ?
protoFileElement.getPackageName() : schemaName.toLowerCase());
List<TypeElement> types = protoFileElement.getTypes();

View File

@ -160,7 +160,7 @@ public class ProtoCoapAdaptor implements CoapTransportAdaptor {
private String dynamicMsgToJson(byte[] bytes, Descriptors.Descriptor descriptor) throws InvalidProtocolBufferException {
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, bytes);
return JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage);
return JsonFormat.printer().print(dynamicMessage);
}
}

View File

@ -82,6 +82,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static com.google.protobuf.FieldType.MESSAGE;
import static org.thingsboard.server.common.data.CacheConstants.DEVICE_PROFILE_CACHE;
import static org.thingsboard.server.dao.service.Validator.validateId;
@ -381,6 +382,7 @@ public class DeviceProfileServiceImpl extends AbstractEntityService implements D
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration =
(ProtoTransportPayloadConfiguration) mqttTransportConfiguration.getTransportPayloadTypeConfiguration();
validateProtoSchemas(protoTransportPayloadConfiguration);
validateTelemetryDynamicMessageFields(protoTransportPayloadConfiguration);
validateRpcRequestDynamicMessageFields(protoTransportPayloadConfiguration);
}
} else if (transportConfiguration instanceof CoapDeviceProfileTransportConfiguration) {
@ -392,6 +394,7 @@ public class DeviceProfileServiceImpl extends AbstractEntityService implements D
if (transportPayloadTypeConfiguration instanceof ProtoTransportPayloadConfiguration) {
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
validateProtoSchemas(protoTransportPayloadConfiguration);
validateTelemetryDynamicMessageFields(protoTransportPayloadConfiguration);
validateRpcRequestDynamicMessageFields(protoTransportPayloadConfiguration);
}
}
@ -609,6 +612,33 @@ public class DeviceProfileServiceImpl extends AbstractEntityService implements D
};
private void validateTelemetryDynamicMessageFields(ProtoTransportPayloadConfiguration protoTransportPayloadTypeConfiguration) {
String deviceTelemetryProtoSchema = protoTransportPayloadTypeConfiguration.getDeviceTelemetryProtoSchema();
Descriptors.Descriptor telemetryDynamicMessageDescriptor = protoTransportPayloadTypeConfiguration.getTelemetryDynamicMessageDescriptor(deviceTelemetryProtoSchema);
if (telemetryDynamicMessageDescriptor == null) {
throw new DataValidationException(invalidSchemaProvidedMessage(TELEMETRY_PROTO_SCHEMA) + " Failed to get telemetryDynamicMessageDescriptor!");
} else {
List<Descriptors.FieldDescriptor> fields = telemetryDynamicMessageDescriptor.getFields();
if (CollectionUtils.isEmpty(fields)) {
throw new DataValidationException(invalidSchemaProvidedMessage(TELEMETRY_PROTO_SCHEMA) + " " + telemetryDynamicMessageDescriptor.getName() + " fields is empty!");
} else if (fields.size() == 2) {
Descriptors.FieldDescriptor tsFieldDescriptor = telemetryDynamicMessageDescriptor.findFieldByName("ts");
Descriptors.FieldDescriptor valuesFieldDescriptor = telemetryDynamicMessageDescriptor.findFieldByName("values");
if (tsFieldDescriptor != null && valuesFieldDescriptor != null) {
if (!Descriptors.FieldDescriptor.Type.MESSAGE.equals(valuesFieldDescriptor.getType())) {
throw new DataValidationException(invalidSchemaProvidedMessage(TELEMETRY_PROTO_SCHEMA) + " Field 'values' has invalid data type. Only message type is supported!");
}
if (!Descriptors.FieldDescriptor.Type.INT64.equals(tsFieldDescriptor.getType())) {
throw new DataValidationException(invalidSchemaProvidedMessage(TELEMETRY_PROTO_SCHEMA) + " Field 'ts' has invalid data type. Only int64 type is supported!");
}
if (!tsFieldDescriptor.hasOptionalKeyword()) {
throw new DataValidationException(invalidSchemaProvidedMessage(TELEMETRY_PROTO_SCHEMA) + " Field 'ts' has invalid label. Field 'ts' should have optional keyword!");
}
}
}
}
}
private void validateRpcRequestDynamicMessageFields(ProtoTransportPayloadConfiguration protoTransportPayloadTypeConfiguration) {
DynamicMessage.Builder rpcRequestDynamicMessageBuilder = protoTransportPayloadTypeConfiguration.getRpcRequestDynamicMessageBuilder(protoTransportPayloadTypeConfiguration.getDeviceRpcRequestProtoSchema());
Descriptors.Descriptor rpcRequestDynamicMessageDescriptor = rpcRequestDynamicMessageBuilder.getDescriptorForType();

View File

@ -133,16 +133,16 @@ export const defaultTelemetrySchema =
'\n' +
'message SensorDataReading {\n' +
'\n' +
' double temperature = 1;\n' +
' double humidity = 2;\n' +
' optional double temperature = 1;\n' +
' optional double humidity = 2;\n' +
' InnerObject innerObject = 3;\n' +
'\n' +
' message InnerObject {\n' +
' string key1 = 1;\n' +
' bool key2 = 2;\n' +
' double key3 = 3;\n' +
' int32 key4 = 4;\n' +
' string key5 = 5;\n' +
' optional string key1 = 1;\n' +
' optional bool key2 = 2;\n' +
' optional double key3 = 3;\n' +
' optional int32 key4 = 4;\n' +
' optional string key5 = 5;\n' +
' }\n' +
'}\n';
@ -151,8 +151,8 @@ export const defaultAttributesSchema =
'package attributes;\n' +
'\n' +
'message SensorConfiguration {\n' +
' string firmwareVersion = 1;\n' +
' string serialNumber = 2;\n' +
' optional string firmwareVersion = 1;\n' +
' optional string serialNumber = 2;\n' +
'}';
export const defaultRpcRequestSchema =
@ -160,9 +160,9 @@ export const defaultRpcRequestSchema =
'package rpc;\n' +
'\n' +
'message RpcRequestMsg {\n' +
' string method = 1;\n' +
' int32 requestId = 2;\n' +
' string params = 3;\n' +
' optional string method = 1;\n' +
' optional int32 requestId = 2;\n' +
' optional string params = 3;\n' +
'}';
export const defaultRpcResponseSchema =
@ -170,7 +170,7 @@ export const defaultRpcResponseSchema =
'package rpc;\n' +
'\n' +
'message RpcResponseMsg {\n' +
' string payload = 1;\n' +
' optional string payload = 1;\n' +
'}';
export const coapDeviceTypeTranslationMap = new Map<CoapTransportDeviceType, string>(