diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java index 7436de8b72..9366c7e150 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java @@ -100,7 +100,7 @@ public class LocalTransportApiService implements TransportApiService { //TODO: Make async and enable caching DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId); if (credentials != null && credentials.getCredentialsType() == credentialsType) { - return getDeviceInfo(credentials.getDeviceId()); + return getDeviceInfo(credentials.getDeviceId(), credentials); } else { return getEmptyTransportApiResponseFuture(); } @@ -135,7 +135,7 @@ public class LocalTransportApiService implements TransportApiService { } - private ListenableFuture getDeviceInfo(DeviceId deviceId) { + private ListenableFuture getDeviceInfo(DeviceId deviceId, DeviceCredentials credentials) { return Futures.transform(deviceService.findDeviceByIdAsync(TenantId.SYS_TENANT_ID, deviceId), device -> { if (device == null) { log.trace("[{}] Failed to lookup device by id", deviceId); @@ -143,7 +143,7 @@ public class LocalTransportApiService implements TransportApiService { } try { return TransportApiResponseMsg.newBuilder() - .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build(); + .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).setCredentialsBody(credentials.getCredentialsValue()).build()).build(); } catch (JsonProcessingException e) { log.warn("[{}] Failed to lookup device by id", deviceId, e); return getEmptyTransportApiResponse(); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java index aa643ebc9b..29e5859aea 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java @@ -54,18 +54,18 @@ import java.util.concurrent.TimeUnit; @Slf4j @Component("MqttSslHandlerProvider") @ConditionalOnExpression("'${transport.type:null}'=='null' || ('${transport.type}'=='local' && '${transport.http.enabled}'=='true')") -@ConditionalOnProperty(prefix = "mqtt.ssl", value = "enabled", havingValue = "true", matchIfMissing = false) +@ConditionalOnProperty(prefix = "transport.mqtt.ssl", value = "enabled", havingValue = "true", matchIfMissing = false) public class MqttSslHandlerProvider { - @Value("${mqtt.ssl.protocol}") + @Value("${transport.mqtt.ssl.protocol}") private String sslProtocol; - @Value("${mqtt.ssl.key_store}") + @Value("${transport.mqtt.ssl.key_store}") private String keyStoreFile; - @Value("${mqtt.ssl.key_store_password}") + @Value("${transport.mqtt.ssl.key_store_password}") private String keyStorePassword; - @Value("${mqtt.ssl.key_password}") + @Value("${transport.mqtt.ssl.key_password}") private String keyPassword; - @Value("${mqtt.ssl.key_store_type}") + @Value("${transport.mqtt.ssl.key_store_type}") private String keyStoreType; @Autowired diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 3986837f6f..1a759bc98c 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -271,6 +271,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC: case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: case MqttTopics.GATEWAY_RPC_TOPIC: + case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC: case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC: registerSubQoS(topic, grantedQoSList, reqQoS); break; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java index 550837842f..eb8e5fb60f 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java @@ -36,9 +36,8 @@ public class MqttTransportServerInitializer extends ChannelInitializer taskCtx) { if (log.isTraceEnabled()) { - log.trace("[{}] {} task: {}", taskCtx.getId(), action, taskCtx); + if (taskCtx.getTask() instanceof CassandraStatementTask) { + CassandraStatementTask cassStmtTask = (CassandraStatementTask) taskCtx.getTask(); + if (cassStmtTask.getStatement() instanceof BoundStatement) { + BoundStatement stmt = (BoundStatement) cassStmtTask.getStatement(); + String query = toStringWithValues(stmt, ProtocolVersion.V5); + log.trace("[{}] {} task: {}, BoundStatement query: {}", taskCtx.getId(), action, taskCtx, query); + } + } else { + log.trace("[{}] {} task: {}", taskCtx.getId(), action, taskCtx); + } } else { log.debug("[{}] {} task", taskCtx.getId(), action); } } + private static String toStringWithValues(BoundStatement boundStatement, ProtocolVersion protocolVersion) { + CodecRegistry codecRegistry = boundStatement.preparedStatement().getCodecRegistry(); + PreparedStatement preparedStatement = boundStatement.preparedStatement(); + String query = preparedStatement.getQueryString(); + ColumnDefinitions defs = preparedStatement.getVariables(); + int index = 0; + for (ColumnDefinitions.Definition def : defs) { + DataType type = def.getType(); + TypeCodec codec = codecRegistry.codecFor(type); + if (boundStatement.getBytesUnsafe(index) != null) { + Object value = codec.deserialize(boundStatement.getBytesUnsafe(index), protocolVersion); + query = query.replaceFirst("\\?", codec.format(value)); + } + index++; + } + return query; + } + protected int getQueueSize() { return queue.size(); } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java index 9f86b8e6cd..000925e8b4 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java @@ -15,8 +15,6 @@ */ package org.thingsboard.mqtt; -import io.netty.channel.ChannelId; - /** * Created by Valerii Sosliuk on 12/30/2017. */