sparkplug: add TOPIC_STATE_REGEXP

This commit is contained in:
nick 2025-01-23 17:34:02 +02:00
parent 80284ad7c0
commit b433367526
2 changed files with 7 additions and 6 deletions

View File

@ -60,6 +60,7 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetr
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.fromSparkplugBMetricToKeyValueProto;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.validatedValueByTypeMetric;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_REGEXP;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_STATE_REGEXP;
/**
* Created by nickAS21 on 12.12.22
@ -173,10 +174,9 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
*/
public void handleSparkplugSubscribeMsg(MqttTopicSubscription subscription) throws ThingsboardException {
String topic = subscription.topicFilter();
String[] splitTopic = topic.split(TOPIC_SPLIT_REGEXP);
if (STATE.name().equals(splitTopic[1])) {
if (topic != null && topic.startsWith(TOPIC_STATE_REGEXP)) {
log.trace("Subscribing on its own spBv1.0/STATE/[the Sparkplug Host Application] - Implemented as status via checkSparkplugNodeSession");
} else if (this.validateTopicDataSubscribe(splitTopic)) {
} else if (this.validateTopicDataSubscribe(topic)) {
// TODO if need subscription DATA
log.trace("Subscribing on its own [" + topic + "] - Implemented as SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG/SUBSCRIBE_TO_RPC_ASYNC_MSG via checkSparkplugNode/DeviceSession");
} else {
@ -325,11 +325,12 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
* Subscribe: spBv1.0/G1/DDATA/E1/D1/#
* Subscribe: spBv1.0/G1/DDATA/E1/D1/+
* Parses a Sparkplug MQTT message topic string and returns a {@link SparkplugTopic} instance.
* @param splitTopic a topic string[] UTF-8
* @param topic a topic UTF-8
* @return a {@link SparkplugTopic} instance
* @throws ThingsboardException if an error occurs while parsing
*/
public boolean validateTopicDataSubscribe(String[] splitTopic) throws ThingsboardException {
public boolean validateTopicDataSubscribe(String topic) throws ThingsboardException {
String[] splitTopic = topic.split(TOPIC_SPLIT_REGEXP);
if (splitTopic.length >= 4 && splitTopic.length <= 5 &&
splitTopic[0].equals(this.sparkplugTopicNode.getNamespace()) &&
splitTopic[1].equals(this.sparkplugTopicNode.getGroupId()) &&

View File

@ -35,7 +35,7 @@ public class SparkplugTopicService {
public static final String TOPIC_ROOT_SPB_V_1_0 = "spBv1.0";
public static final String TOPIC_ROOT_CERT_SP = "$sparkplug/certificates/";
public static final String TOPIC_SPLIT_REGEXP = "/";
public static final String TOPIC_STATE_REGEXP = TOPIC_SPLIT_REGEXP + STATE.name() + TOPIC_SPLIT_REGEXP;
public static final String TOPIC_STATE_REGEXP = TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + STATE.name() + TOPIC_SPLIT_REGEXP;
public static SparkplugTopic getSplitTopic(String topic) throws ThingsboardException {
SparkplugTopic sparkplugTopic = SPLIT_TOPIC_CACHE.get(topic);