Merge pull request #9082 from imbeacon/feature/mqtt-topic-in-metadata-for-transport

Feature/mqtt topic in metadata for transport
This commit is contained in:
Andrew Shvayka 2023-08-18 13:07:38 +03:00 committed by GitHub
commit e5658f68df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 104 additions and 14 deletions

View File

@ -124,6 +124,8 @@ public class DataConstants {
public static final String LAST_CONNECTED_GATEWAY = "lastConnectedGateway";
public static final String MQTT_TOPIC = "mqttTopic";
public static final String MAIN_QUEUE_NAME = "Main";
public static final String MAIN_QUEUE_TOPIC = "tb_rule_engine.main";
public static final String HP_QUEUE_NAME = "HighPriority";

View File

@ -59,6 +59,7 @@ import org.thingsboard.server.common.data.id.OtaPackageId;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.msg.EncryptionUtil;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService;
@ -136,6 +137,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE;
private final UUID sessionId;
protected final MqttTransportContext context;
private final TransportService transportService;
private final SchedulerComponent scheduler;
@ -442,10 +444,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
MqttTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();
if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {
TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getMetadata(deviceSessionCtx, topicName),
getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getMetadata(deviceSessionCtx, topicName),
getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
@ -466,22 +470,28 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.SOFTWARE);
} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getMetadata(deviceSessionCtx, topicName),
getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getJsonMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getMetadata(deviceSessionCtx, topicName),
getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getProtoMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getMetadata(deviceSessionCtx, topicName),
getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC)) {
TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getMetadata(deviceSessionCtx, topicName),
getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC)) {
TransportProtos.PostAttributeMsg postAttributeMsg = context.getJsonMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getMetadata(deviceSessionCtx, topicName),
getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC)) {
TransportProtos.PostAttributeMsg postAttributeMsg = context.getProtoMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getMetadata(deviceSessionCtx, topicName),
getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC)) {
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getJsonMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
@ -525,6 +535,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private TbMsgMetaData getMetadata(DeviceSessionCtx ctx, String topicName) {
if (ctx.isDeviceProfileMqttTransportType()) {
TbMsgMetaData md = new TbMsgMetaData();
md.putValue(DataConstants.MQTT_TOPIC, topicName);
return md;
} else {
return null;
}
}
private void sendAckOrCloseSession(ChannelHandlerContext ctx, String topicName, int msgId) {
if ((deviceSessionCtx.isSendAckOnValidationException() || MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) && msgId > 0) {
log.debug("[{}] Send pub ack on invalid publish msg [{}][{}]", sessionId, topicName, msgId);

View File

@ -92,10 +92,14 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
private volatile boolean useJsonPayloadFormatForDefaultDownlinkTopics;
private volatile boolean sendAckOnValidationException;
@Getter
private volatile boolean deviceProfileMqttTransportType;
@Getter
@Setter
private TransportPayloadType provisionPayloadType = payloadType;
public DeviceSessionCtx(UUID sessionId, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap, MqttTransportContext context) {
super(sessionId, mqttQoSMap);
this.context = context;
@ -165,6 +169,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
MqttDeviceProfileTransportConfiguration mqttConfig = (MqttDeviceProfileTransportConfiguration) transportConfiguration;
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration = mqttConfig.getTransportPayloadTypeConfiguration();
payloadType = transportPayloadTypeConfiguration.getTransportPayloadType();
deviceProfileMqttTransportType = true;
telemetryTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceTelemetryTopic());
attributesPublishTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceAttributesTopic());
attributesSubscribeTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceAttributesSubscribeTopic());
@ -179,6 +184,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
telemetryTopicFilter = MqttTopicFilterFactory.getDefaultTelemetryFilter();
attributesPublishTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
payloadType = TransportPayloadType.JSON;
deviceProfileMqttTransportType = false;
sendAckOnValidationException = false;
}
updateAdaptor();

View File

@ -16,8 +16,7 @@
package org.thingsboard.server.transport.mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.EmptyByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
@ -34,8 +33,19 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.device.profile.DeviceProfileData;
import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
@ -55,12 +65,14 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.willDoNothing;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@Slf4j
@RunWith(MockitoJUnitRunner.class)
@ -81,9 +93,14 @@ public class MqttTransportHandlerTest {
ExecutorService executor;
MqttTransportHandler handler;
@Spy
TransportService transportService;
@Before
public void setUp() throws Exception {
willReturn(MSG_QUEUE_LIMIT).given(context).getMessageQueueSizePerDeviceLimit();
willReturn(transportService).given(context).getTransportService();
handler = spy(new MqttTransportHandler(context, sslHandler));
willReturn(IP_ADDR).given(handler).getAddress(any());
@ -104,9 +121,17 @@ public class MqttTransportHandlerTest {
}
MqttPublishMessage getMqttPublishMessage() {
return getMqttPublishMessage("v1/gateway/telemetry");
}
MqttPublishMessage getDeviceMqttPublishMessage() {
return getMqttPublishMessage("v1/devices/me/telemetry");
}
MqttPublishMessage getMqttPublishMessage(String topicName) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, true, MqttQoS.AT_LEAST_ONCE, false, 123);
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("v1/gateway/telemetry", packedId.incrementAndGet());
ByteBuf payload = new EmptyByteBuf(new PooledByteBufAllocator());
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topicName, packedId.incrementAndGet());
ByteBuf payload = Unpooled.wrappedBuffer("{\"testKey\":\"testValue\"}".getBytes());
return new MqttPublishMessage(mqttFixedHeader, variableHeader, payload);
}
@ -205,4 +230,26 @@ public class MqttTransportHandlerTest {
messages.forEach((msg) -> verify(handler, times(1)).processRegularSessionMsg(ctx, msg));
}
@Test
public void givenMqttMessage_whenDeviceProfileMqttTransport_thenTopicAddedToMetadata() {
MqttPublishMessage message = getDeviceMqttPublishMessage();
when(context.getJsonMqttAdaptor()).thenReturn(new JsonMqttAdaptor());
handler.deviceSessionCtx.setConnected(true);
DeviceProfile deviceProfile = new DeviceProfile();
DeviceProfileData deviceProfileData = new DeviceProfileData();
MqttDeviceProfileTransportConfiguration mqttDeviceProfileTransportConfiguration = new MqttDeviceProfileTransportConfiguration();
mqttDeviceProfileTransportConfiguration.setTransportPayloadTypeConfiguration(new JsonTransportPayloadConfiguration());
deviceProfileData.setTransportConfiguration(mqttDeviceProfileTransportConfiguration);
deviceProfile.setProfileData(deviceProfileData);
deviceProfile.setTransportType(DeviceTransportType.MQTT);
handler.deviceSessionCtx.setDeviceProfile(deviceProfile);
handler.processRegularSessionMsg(ctx, message);
TbMsgMetaData expectedMd = new TbMsgMetaData();
expectedMd.putValue(DataConstants.MQTT_TOPIC, message.variableHeader().topicName());
verify(transportService, times(1)).process(any(), (TransportProtos.PostTelemetryMsg) any(), eq(expectedMd), any());
}
}

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.common.transport;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.common.transport.service.SessionMetaData;
@ -109,8 +110,12 @@ public interface TransportService {
void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TbMsgMetaData md, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TbMsgMetaData md, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback);

View File

@ -567,6 +567,11 @@ public class DefaultTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
process(sessionInfo, msg, null, callback);
}
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData md, TransportServiceCallback<Void> callback) {
int dataPoints = 0;
for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
dataPoints += tsKv.getKvCount();
@ -578,7 +583,7 @@ public class DefaultTransportService implements TransportService {
CustomerId customerId = getCustomerId(sessionInfo);
MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), new ApiStatsProxyCallback<>(tenantId, customerId, dataPoints, callback));
for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
TbMsgMetaData metaData = new TbMsgMetaData();
TbMsgMetaData metaData = md != null ? md.copy() : new TbMsgMetaData();
metaData.putValue("deviceName", sessionInfo.getDeviceName());
metaData.putValue("deviceType", sessionInfo.getDeviceType());
metaData.putValue("ts", tsKv.getTs() + "");
@ -590,12 +595,17 @@ public class DefaultTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
process(sessionInfo, msg, null, callback);
}
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TbMsgMetaData md, TransportServiceCallback<Void> callback) {
if (checkLimits(sessionInfo, msg, callback, msg.getKvCount())) {
reportActivityInternal(sessionInfo);
TenantId tenantId = getTenantId(sessionInfo);
DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
TbMsgMetaData metaData = new TbMsgMetaData();
TbMsgMetaData metaData = md != null ? md.copy() : new TbMsgMetaData();
metaData.putValue("deviceName", sessionInfo.getDeviceName());
metaData.putValue("deviceType", sessionInfo.getDeviceType());
if (msg.getShared()) {