Merge pull request #6986 from adovh/feature/work-1385-custom-mqtt-subscribe-topic

[3.5] added ability to create&subscribe custom mqtt attributes topics
This commit is contained in:
Andrew Shvayka 2023-04-07 13:48:20 +03:00 committed by GitHub
commit 14ed9df205
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 118 additions and 19 deletions

View File

@ -536,7 +536,19 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
protected MqttDeviceProfileTransportConfiguration createMqttDeviceProfileTransportConfiguration(TransportPayloadTypeConfiguration transportPayloadTypeConfiguration, boolean sendAckOnValidationException) {
MqttDeviceProfileTransportConfiguration mqttDeviceProfileTransportConfiguration = new MqttDeviceProfileTransportConfiguration();
mqttDeviceProfileTransportConfiguration.setDeviceTelemetryTopic(MqttTopics.DEVICE_TELEMETRY_TOPIC);
mqttDeviceProfileTransportConfiguration.setDeviceTelemetryTopic(MqttTopics.DEVICE_ATTRIBUTES_TOPIC);
mqttDeviceProfileTransportConfiguration.setDeviceAttributesTopic(MqttTopics.DEVICE_ATTRIBUTES_TOPIC);
mqttDeviceProfileTransportConfiguration.setDeviceAttributesSubscribeTopic(MqttTopics.DEVICE_ATTRIBUTES_TOPIC);
mqttDeviceProfileTransportConfiguration.setSendAckOnValidationException(sendAckOnValidationException);
mqttDeviceProfileTransportConfiguration.setTransportPayloadTypeConfiguration(transportPayloadTypeConfiguration);
return mqttDeviceProfileTransportConfiguration;
}
protected MqttDeviceProfileTransportConfiguration createMqttDeviceProfileTransportConfiguration(TransportPayloadTypeConfiguration transportPayloadTypeConfiguration, boolean sendAckOnValidationException,
String telemetryTopic, String attributesPublishTopic, String attributesSubscribeTopic) {
MqttDeviceProfileTransportConfiguration mqttDeviceProfileTransportConfiguration = new MqttDeviceProfileTransportConfiguration();
mqttDeviceProfileTransportConfiguration.setDeviceTelemetryTopic(telemetryTopic);
mqttDeviceProfileTransportConfiguration.setDeviceAttributesTopic(attributesPublishTopic);
mqttDeviceProfileTransportConfiguration.setDeviceAttributesSubscribeTopic(attributesSubscribeTopic);
mqttDeviceProfileTransportConfiguration.setSendAckOnValidationException(sendAckOnValidationException);
mqttDeviceProfileTransportConfiguration.setTransportPayloadTypeConfiguration(transportPayloadTypeConfiguration);
return mqttDeviceProfileTransportConfiguration;

View File

@ -945,6 +945,28 @@ public abstract class BaseDeviceProfileControllerTest extends AbstractController
Assert.assertEquals(savedDeviceProfile, foundDeviceProfile);
}
@Test
public void testSaveDeviceProfileWorks() throws Exception {
JsonTransportPayloadConfiguration jsonTransportPayloadConfiguration = new JsonTransportPayloadConfiguration();
MqttDeviceProfileTransportConfiguration mqttDeviceProfileTransportConfiguration =
this.createMqttDeviceProfileTransportConfiguration(jsonTransportPayloadConfiguration, true,
"v1/devices/me/telemetry", "v1/devices/me/attributes", "v1/devices/me/subscribeattributes");
DeviceProfile deviceProfile = this.createDeviceProfile("Device Profile",
mqttDeviceProfileTransportConfiguration);
DeviceProfile savedDeviceProfile = doPost("/api/deviceProfile", deviceProfile, DeviceProfile.class);
Assert.assertNotNull(savedDeviceProfile);
Assert.assertEquals(savedDeviceProfile.getTransportType(), DeviceTransportType.MQTT);
Assert.assertTrue(savedDeviceProfile.getProfileData().getTransportConfiguration() instanceof MqttDeviceProfileTransportConfiguration);
MqttDeviceProfileTransportConfiguration transportConfiguration =
(MqttDeviceProfileTransportConfiguration) savedDeviceProfile.getProfileData().getTransportConfiguration();
Assert.assertTrue(transportConfiguration.isSendAckOnValidationException());
DeviceProfile foundDeviceProfile =
doGet("/api/deviceProfile/" + savedDeviceProfile.getId().getId().toString(), DeviceProfile.class);
Assert.assertEquals(savedDeviceProfile.getProfileData().getTransportConfiguration(),
foundDeviceProfile.getProfileData().getTransportConfiguration());
Assert.assertEquals(savedDeviceProfile, foundDeviceProfile);
}
private DeviceProfile testSaveDeviceProfileWithProtoPayloadType(String schema) throws Exception {
ProtoTransportPayloadConfiguration protoTransportPayloadConfiguration = this.createProtoTransportPayloadConfiguration(schema, schema, null, null);
MqttDeviceProfileTransportConfiguration mqttDeviceProfileTransportConfiguration = this.createMqttDeviceProfileTransportConfiguration(protoTransportPayloadConfiguration, false);

View File

@ -18,7 +18,11 @@ package org.thingsboard.server.transport.mqtt.mqttv3.attributes.updates;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.mqttv3.attributes.AbstractMqttAttributesIntegrationTest;
@ -46,6 +50,23 @@ public class MqttAttributesUpdatesIntegrationTest extends AbstractMqttAttributes
processJsonTestSubscribeToAttributesUpdates(DEVICE_ATTRIBUTES_TOPIC);
}
@Test
public void testJsonSubscribeToAttributesUpdatesFromTheServerOnCustomTopic() throws Exception {
Device tmp = savedDevice;
String customTopic = "v1/devices/me/subscribeattributes";
JsonTransportPayloadConfiguration jsonTransportPayloadConfiguration = new JsonTransportPayloadConfiguration();
MqttDeviceProfileTransportConfiguration mqttDeviceProfileTransportConfiguration =
this.createMqttDeviceProfileTransportConfiguration(jsonTransportPayloadConfiguration, true,
"v1/devices/me/telemetry", "v1/devices/me/attributes", customTopic);
DeviceProfile deviceProfile = this.createDeviceProfile("New device Profile",
mqttDeviceProfileTransportConfiguration);
DeviceProfile savedProfile = doPost("/api/deviceProfile", deviceProfile, DeviceProfile.class);
savedDevice.setDeviceProfileId(savedProfile.getId());
doPost("/api/device", savedDevice);
processJsonTestSubscribeToAttributesUpdates(customTopic);
savedDevice = tmp;
}
@Test
public void testJsonSubscribeToAttributesUpdatesFromTheServerOnShortTopic() throws Exception {
processJsonTestSubscribeToAttributesUpdates(DEVICE_ATTRIBUTES_SHORT_TOPIC);

View File

@ -28,6 +28,9 @@ public class MqttDeviceProfileTransportConfiguration implements DeviceProfileTra
private String deviceTelemetryTopic = MqttTopics.DEVICE_TELEMETRY_TOPIC;
@NoXss
private String deviceAttributesTopic = MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
@NoXss
private String deviceAttributesSubscribeTopic = MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
private TransportPayloadTypeConfiguration transportPayloadTypeConfiguration;
private boolean sparkplug;
private Set<String> sparkplugAttributesMetricNames;

View File

@ -687,6 +687,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
String topic = subscription.topicName();
MqttQoS reqQoS = subscription.qualityOfService();
if (deviceSessionCtx.isDeviceSubscriptionAttributesTopic(topic)){
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
activityReported = true;
continue;
}
try {
if (sparkplugSessionHandler != null) {
sparkplugSessionHandler.handleSparkplugSubscribeMsg(grantedQoSList, subscription, reqQoS);

View File

@ -80,7 +80,8 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
private MqttVersion mqttVersion;
private volatile MqttTopicFilter telemetryTopicFilter = MqttTopicFilterFactory.getDefaultTelemetryFilter();
private volatile MqttTopicFilter attributesTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
private volatile MqttTopicFilter attributesPublishTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
private volatile MqttTopicFilter attributesSubscribeTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
private volatile TransportPayloadType payloadType = TransportPayloadType.JSON;
private volatile Descriptors.Descriptor attributesDynamicMessageDescriptor;
private volatile Descriptors.Descriptor telemetryDynamicMessageDescriptor;
@ -110,7 +111,11 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
}
public boolean isDeviceAttributesTopic(String topicName) {
return attributesTopicFilter.filter(topicName);
return attributesPublishTopicFilter.filter(topicName);
}
public boolean isDeviceSubscriptionAttributesTopic(String topicName) {
return attributesSubscribeTopicFilter.filter(topicName);
}
public MqttTransportAdaptor getPayloadAdaptor() {
@ -161,7 +166,8 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttConfig.getTransportPayloadTypeConfiguration();
payloadType = transportPayloadTypeConfiguration.getTransportPayloadType();
telemetryTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceTelemetryTopic());
attributesTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceAttributesTopic());
attributesPublishTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceAttributesTopic());
attributesSubscribeTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceAttributesSubscribeTopic());
sendAckOnValidationException = mqttConfig.isSendAckOnValidationException();
if (TransportPayloadType.PROTOBUF.equals(payloadType)) {
ProtoTransportPayloadConfiguration protoTransportPayloadConfig = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
@ -171,7 +177,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
}
} else {
telemetryTopicFilter = MqttTopicFilterFactory.getDefaultTelemetryFilter();
attributesTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
attributesPublishTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
payloadType = TransportPayloadType.JSON;
sendAckOnValidationException = false;
}

View File

@ -66,6 +66,21 @@
{{ 'device-profile.not-valid-multi-character' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field fxFlex>
<mat-label translate>device-profile.attributes-subscribe-topic-filter</mat-label>
<input matInput required
formControlName="deviceAttributesSubscribeTopic"
type="text">
<mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('deviceAttributesSubscribeTopic').hasError('required')">
{{ 'device-profile.attributes-subscribe-topic-filter-required' | translate}}
</mat-error>
<mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('deviceAttributesSubscribeTopic').hasError('invalidSingleTopicCharacter')">
{{ 'device-profile.not-valid-single-character' | translate}}
</mat-error>
<mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('deviceAttributesSubscribeTopic').hasError('invalidMultiTopicCharacter')">
{{ 'device-profile.not-valid-multi-character' | translate}}
</mat-error>
</mat-form-field>
</div>
<mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.hasError('unique')">
{{ 'device-profile.mqtt-device-topic-filters-unique' | translate }}

View File

@ -91,6 +91,7 @@ export class MqttDeviceProfileTransportConfigurationComponent implements Control
ngOnInit() {
this.mqttDeviceProfileTransportConfigurationFormGroup = this.fb.group({
deviceAttributesTopic: [null, [Validators.required, this.validationMQTTTopic()]],
deviceAttributesSubscribeTopic: [null, [Validators.required, this.validationMQTTTopic()]],
deviceTelemetryTopic: [null, [Validators.required, this.validationMQTTTopic()]],
sparkplug: [false],
sparkplugAttributesMetricNames: [null],

View File

@ -1104,8 +1104,10 @@
"support-level-wildcards": "Jsou podporovány jednoúrovňové <code>[+]</code> a víceúrovňové <code>[#]</code> zástupné znaky.",
"telemetry-topic-filter": "Filtr fronty telemetrie",
"telemetry-topic-filter-required": "Filtr fronty telemetrie je povinný.",
"attributes-topic-filter": "Filtr atributů fronty",
"attributes-topic-filter-required": "Filtr atributů fronty je povinný.",
"attributes-topic-filter": "Attributes publish topic filter",
"attributes-subscribe-topic-filter": "Attributes subscribe topic filter",
"attributes-topic-filter-required": "Attributes publish topic filter is required.",
"attributes-subscribe-topic-filter-required": "Attributes subscribe topic is required",
"telemetry-proto-schema": "Proto schéma telemetrie",
"telemetry-proto-schema-required": "Proto schéma telemetrie je povinné.",
"attributes-proto-schema": "Atributy proto schémata",

View File

@ -1440,8 +1440,10 @@
"support-level-wildcards": "Single <code>[+]</code> and multi-level <code>[#]</code> wildcards supported.",
"telemetry-topic-filter": "Telemetry topic filter",
"telemetry-topic-filter-required": "Telemetry topic filter is required.",
"attributes-topic-filter": "Attributes topic filter",
"attributes-topic-filter-required": "Attributes topic filter is required.",
"attributes-topic-filter": "Attributes publish topic filter",
"attributes-subscribe-topic-filter": "Attributes subscribe topic filter",
"attributes-topic-filter-required": "Attributes publish topic filter is required.",
"attributes-subscribe-topic-filter-required": "Attributes subscribe topic is required",
"telemetry-proto-schema": "Telemetry proto schema",
"telemetry-proto-schema-required": "Telemetry proto schema is required.",
"attributes-proto-schema": "Attributes proto schema",

View File

@ -1298,8 +1298,10 @@
"support-level-wildcards": "Se soportan los wilcards únicos <code>[+]</code> y multi-nivel <code>[#]</code>.",
"telemetry-topic-filter": "Filtro de topic en telemetría",
"telemetry-topic-filter-required": "Se requiere filtro de topic (telemetría).",
"attributes-topic-filter": "Filtro de topic en atributos",
"attributes-topic-filter-required": "Se requiere filtro de topic (atributos).",
"attributes-topic-filter": "Attributes publish topic filter",
"attributes-subscribe-topic-filter": "Attributes subscribe topic filter",
"attributes-topic-filter-required": "Attributes publish topic filter is required.",
"attributes-subscribe-topic-filter-required": "Attributes subscribe topic is required",
"telemetry-proto-schema": "Proto esquema de telemetría",
"telemetry-proto-schema-required": "Se requiere proto esquema de telemetría.",
"attributes-proto-schema": "Proto esquema de atributos",

View File

@ -1129,8 +1129,10 @@
"support-level-wildcards": "<code>[+]</code> unique et wildcards de <code>[#]</code> multi-niveaux supportés.",
"telemetry-topic-filter": "Filtre de sujets de télémétrie",
"telemetry-topic-filter-required": "Filtre de sujets de télémétrie est requis.",
"attributes-topic-filter": "Filtre de sujets d'attributs",
"attributes-topic-filter-required": "Filtre de sujets d'attributs est requis.",
"attributes-topic-filter": "Attributes publish topic filter",
"attributes-subscribe-topic-filter": "Attributes subscribe topic filter",
"attributes-topic-filter-required": "Attributes publish topic filter is required.",
"attributes-subscribe-topic-filter-required": "Attributes subscribe topic is required",
"telemetry-proto-schema": "Schéma proto de télémétrie",
"telemetry-proto-schema-required": "Schéma proto de télémétrie est requis.",
"attributes-proto-schema": "Schéma proto d'attributs",

View File

@ -976,8 +976,10 @@
"support-level-wildcards": "Single <code>[+]</code> and multi-level <code>[#]</code> wildcards supported.",
"telemetry-topic-filter": "Telemetry topic filter",
"telemetry-topic-filter-required": "Telemetry topic filter is required.",
"attributes-topic-filter": "Attributes topic filter",
"attributes-topic-filter-required": "Attributes topic filter is required.",
"attributes-topic-filter": "Attributes publish topic filter",
"attributes-subscribe-topic-filter": "Attributes subscribe topic filter",
"attributes-topic-filter-required": "Attributes publish topic filter is required.",
"attributes-subscribe-topic-filter-required": "Attributes subscribe topic is required",
"telemetry-proto-schema": "Telemetry proto schema",
"telemetry-proto-schema-required": "Telemetry proto schema is required.",
"attributes-proto-schema": "Attributes proto schema",

View File

@ -976,8 +976,10 @@
"support-level-wildcards": "Single <code>[+]</code> and multi-level <code>[#]</code> wildcards supported.",
"telemetry-topic-filter": "Telemetry topic filter",
"telemetry-topic-filter-required": "Telemetry topic filter is required.",
"attributes-topic-filter": "Attributes topic filter",
"attributes-topic-filter-required": "Attributes topic filter is required.",
"attributes-topic-filter": "Attributes publish topic filter",
"attributes-subscribe-topic-filter": "Attributes subscribe topic filter",
"attributes-topic-filter-required": "Attributes publish topic filter is required.",
"attributes-subscribe-topic-filter-required": "Attributes subscribe topic is required",
"telemetry-proto-schema": "Telemetry proto schema",
"telemetry-proto-schema-required": "Telemetry proto schema is required.",
"attributes-proto-schema": "Attributes proto schema",

View File

@ -1107,8 +1107,10 @@
"support-level-wildcards": "Tekli <code>[+]</code> ve çoklu <code>[#]</code> joker karakter destekler.",
"telemetry-topic-filter": "Telemetri konu filtresi",
"telemetry-topic-filter-required": "Telemetri konu filtresi gerekli.",
"attributes-topic-filter": "Öznitelikler konu filtresi",
"attributes-topic-filter-required": "Öznitelikler konu filtresi gerekli.",
"attributes-topic-filter": "Attributes publish topic filter",
"attributes-subscribe-topic-filter": "Attributes subscribe topic filter",
"attributes-topic-filter-required": "Attributes publish topic filter is required.",
"attributes-subscribe-topic-filter-required": "Attributes subscribe topic is required",
"telemetry-proto-schema": "Telemetri proto şeması",
"telemetry-proto-schema-required": "Telemetri proto şeması gerekli.",
"attributes-proto-schema": "Öznitelikler proto şeması",