Added topic to metadata for attributes and telemetry messages

This commit is contained in:
imbeacon 2023-08-11 11:48:24 +03:00
parent 91e94d5777
commit 355a92588a
4 changed files with 36 additions and 10 deletions

View File

@ -124,6 +124,8 @@ public class DataConstants {
public static final String LAST_CONNECTED_GATEWAY = "lastConnectedGateway"; public static final String LAST_CONNECTED_GATEWAY = "lastConnectedGateway";
public static final String TOPIC = "topic";
public static final String MAIN_QUEUE_NAME = "Main"; public static final String MAIN_QUEUE_NAME = "Main";
public static final String MAIN_QUEUE_TOPIC = "tb_rule_engine.main"; public static final String MAIN_QUEUE_TOPIC = "tb_rule_engine.main";
public static final String HP_QUEUE_NAME = "HighPriority"; public static final String HP_QUEUE_NAME = "HighPriority";

View File

@ -30,6 +30,7 @@ import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttPubAckMessage; import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload; import io.netty.handler.codec.mqtt.MqttSubAckPayload;
@ -59,6 +60,7 @@ import org.thingsboard.server.common.data.id.OtaPackageId;
import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.msg.EncryptionUtil; import org.thingsboard.server.common.msg.EncryptionUtil;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportService;
@ -440,12 +442,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
try { try {
Matcher fwMatcher; Matcher fwMatcher;
MqttTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor(); MqttTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();
TbMsgMetaData md = createMetadataWithTopic(topicName);
if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) { if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {
TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg); TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg)); transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, md, getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) { } else if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg); TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg)); transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, md, getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) { } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX); TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg)); transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
@ -466,22 +469,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.SOFTWARE); getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.SOFTWARE);
} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC)) { } else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg); TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg)); transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, md, getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC)) { } else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getJsonMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg); TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getJsonMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg)); transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, md, getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC)) { } else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getProtoMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg); TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getProtoMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg)); transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, md, getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC)) { } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC)) {
TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg); TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg)); transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, md, getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC)) { } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC)) {
TransportProtos.PostAttributeMsg postAttributeMsg = context.getJsonMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg); TransportProtos.PostAttributeMsg postAttributeMsg = context.getJsonMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg)); transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, md, getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC)) { } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC)) {
TransportProtos.PostAttributeMsg postAttributeMsg = context.getProtoMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg); TransportProtos.PostAttributeMsg postAttributeMsg = context.getProtoMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg)); transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, md, getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC)) { } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC)) {
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getJsonMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC); TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getJsonMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
@ -525,6 +528,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
} }
private static TbMsgMetaData createMetadataWithTopic(String topicName) {
TbMsgMetaData md = new TbMsgMetaData();
md.putValue(DataConstants.TOPIC, topicName);
return md;
}
private void sendAckOrCloseSession(ChannelHandlerContext ctx, String topicName, int msgId) { private void sendAckOrCloseSession(ChannelHandlerContext ctx, String topicName, int msgId) {
if ((deviceSessionCtx.isSendAckOnValidationException() || MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) && msgId > 0) { if ((deviceSessionCtx.isSendAckOnValidationException() || MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) && msgId > 0) {
log.debug("[{}] Send pub ack on invalid publish msg [{}][{}]", sessionId, topicName, msgId); log.debug("[{}] Send pub ack on invalid publish msg [{}][{}]", sessionId, topicName, msgId);

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.common.transport;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.common.transport.service.SessionMetaData; import org.thingsboard.server.common.transport.service.SessionMetaData;
@ -109,8 +110,12 @@ public interface TransportService {
void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback); void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TbMsgMetaData md, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback); void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TbMsgMetaData md, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback); void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback); void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback);

View File

@ -567,6 +567,11 @@ public class DefaultTransportService implements TransportService {
@Override @Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) { public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
process(sessionInfo, msg, null, callback);
}
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData md, TransportServiceCallback<Void> callback) {
int dataPoints = 0; int dataPoints = 0;
for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
dataPoints += tsKv.getKvCount(); dataPoints += tsKv.getKvCount();
@ -578,7 +583,7 @@ public class DefaultTransportService implements TransportService {
CustomerId customerId = getCustomerId(sessionInfo); CustomerId customerId = getCustomerId(sessionInfo);
MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), new ApiStatsProxyCallback<>(tenantId, customerId, dataPoints, callback)); MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), new ApiStatsProxyCallback<>(tenantId, customerId, dataPoints, callback));
for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
TbMsgMetaData metaData = new TbMsgMetaData(); TbMsgMetaData metaData = md != null ? md.copy() : new TbMsgMetaData();
metaData.putValue("deviceName", sessionInfo.getDeviceName()); metaData.putValue("deviceName", sessionInfo.getDeviceName());
metaData.putValue("deviceType", sessionInfo.getDeviceType()); metaData.putValue("deviceType", sessionInfo.getDeviceType());
metaData.putValue("ts", tsKv.getTs() + ""); metaData.putValue("ts", tsKv.getTs() + "");
@ -590,12 +595,17 @@ public class DefaultTransportService implements TransportService {
@Override @Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) { public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
process(sessionInfo, msg, null, callback);
}
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TbMsgMetaData md, TransportServiceCallback<Void> callback) {
if (checkLimits(sessionInfo, msg, callback, msg.getKvCount())) { if (checkLimits(sessionInfo, msg, callback, msg.getKvCount())) {
reportActivityInternal(sessionInfo); reportActivityInternal(sessionInfo);
TenantId tenantId = getTenantId(sessionInfo); TenantId tenantId = getTenantId(sessionInfo);
DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
TbMsgMetaData metaData = new TbMsgMetaData(); TbMsgMetaData metaData = md != null ? md.copy() : new TbMsgMetaData();
metaData.putValue("deviceName", sessionInfo.getDeviceName()); metaData.putValue("deviceName", sessionInfo.getDeviceName());
metaData.putValue("deviceType", sessionInfo.getDeviceType()); metaData.putValue("deviceType", sessionInfo.getDeviceType());
if (msg.getShared()) { if (msg.getShared()) {