From f5bd9a2133e4c442a054e620e774c141ff55cac3 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Thu, 27 Dec 2018 13:11:47 +0200 Subject: [PATCH 1/3] Issue #1327 fixed --- .../thingsboard/server/transport/mqtt/MqttTransportHandler.java | 1 + .../src/main/java/org/thingsboard/mqtt/MqttClientCallback.java | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 3986837f6f..1a759bc98c 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -271,6 +271,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC: case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: case MqttTopics.GATEWAY_RPC_TOPIC: + case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC: case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC: registerSubQoS(topic, grantedQoSList, reqQoS); break; diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java index 9f86b8e6cd..000925e8b4 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java @@ -15,8 +15,6 @@ */ package org.thingsboard.mqtt; -import io.netty.channel.ChannelId; - /** * Created by Valerii Sosliuk on 12/30/2017. */ From d6eb5727794f3265c313be628837734fc683c2ce Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 26 Dec 2018 20:46:03 +0200 Subject: [PATCH 2/3] Improved logging for bound statement --- .../util/AbstractBufferedRateExecutor.java | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java index a5415f0a1c..8a70059bb2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.dao.util; +import com.datastax.driver.core.*; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -22,6 +23,7 @@ import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.tools.TbRateLimits; +import org.thingsboard.server.dao.nosql.CassandraStatementTask; import javax.annotation.Nullable; import java.util.UUID; @@ -183,12 +185,39 @@ public abstract class AbstractBufferedRateExecutor taskCtx) { if (log.isTraceEnabled()) { - log.trace("[{}] {} task: {}", taskCtx.getId(), action, taskCtx); + if (taskCtx.getTask() instanceof CassandraStatementTask) { + CassandraStatementTask cassStmtTask = (CassandraStatementTask) taskCtx.getTask(); + if (cassStmtTask.getStatement() instanceof BoundStatement) { + BoundStatement stmt = (BoundStatement) cassStmtTask.getStatement(); + String query = toStringWithValues(stmt, ProtocolVersion.V5); + log.trace("[{}] {} task: {}, BoundStatement query: {}", taskCtx.getId(), action, taskCtx, query); + } + } else { + log.trace("[{}] {} task: {}", taskCtx.getId(), action, taskCtx); + } } else { log.debug("[{}] {} task", taskCtx.getId(), action); } } + private static String toStringWithValues(BoundStatement boundStatement, ProtocolVersion protocolVersion) { + CodecRegistry codecRegistry = boundStatement.preparedStatement().getCodecRegistry(); + PreparedStatement preparedStatement = boundStatement.preparedStatement(); + String query = preparedStatement.getQueryString(); + ColumnDefinitions defs = preparedStatement.getVariables(); + int index = 0; + for (ColumnDefinitions.Definition def : defs) { + DataType type = def.getType(); + TypeCodec codec = codecRegistry.codecFor(type); + if (boundStatement.getBytesUnsafe(index) != null) { + Object value = codec.deserialize(boundStatement.getBytesUnsafe(index), protocolVersion); + query = query.replaceFirst("\\?", codec.format(value)); + } + index++; + } + return query; + } + protected int getQueueSize() { return queue.size(); } From 920ebc792a1df4f89d7f78215a43994e5128f405 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Thu, 27 Dec 2018 14:02:03 +0200 Subject: [PATCH 3/3] Issue #1355 fixed --- .../service/transport/LocalTransportApiService.java | 6 +++--- .../transport/mqtt/MqttSslHandlerProvider.java | 12 ++++++------ .../mqtt/MqttTransportServerInitializer.java | 3 +-- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java index 7436de8b72..9366c7e150 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java @@ -100,7 +100,7 @@ public class LocalTransportApiService implements TransportApiService { //TODO: Make async and enable caching DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId); if (credentials != null && credentials.getCredentialsType() == credentialsType) { - return getDeviceInfo(credentials.getDeviceId()); + return getDeviceInfo(credentials.getDeviceId(), credentials); } else { return getEmptyTransportApiResponseFuture(); } @@ -135,7 +135,7 @@ public class LocalTransportApiService implements TransportApiService { } - private ListenableFuture getDeviceInfo(DeviceId deviceId) { + private ListenableFuture getDeviceInfo(DeviceId deviceId, DeviceCredentials credentials) { return Futures.transform(deviceService.findDeviceByIdAsync(TenantId.SYS_TENANT_ID, deviceId), device -> { if (device == null) { log.trace("[{}] Failed to lookup device by id", deviceId); @@ -143,7 +143,7 @@ public class LocalTransportApiService implements TransportApiService { } try { return TransportApiResponseMsg.newBuilder() - .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build(); + .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).setCredentialsBody(credentials.getCredentialsValue()).build()).build(); } catch (JsonProcessingException e) { log.warn("[{}] Failed to lookup device by id", deviceId, e); return getEmptyTransportApiResponse(); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java index aa643ebc9b..29e5859aea 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java @@ -54,18 +54,18 @@ import java.util.concurrent.TimeUnit; @Slf4j @Component("MqttSslHandlerProvider") @ConditionalOnExpression("'${transport.type:null}'=='null' || ('${transport.type}'=='local' && '${transport.http.enabled}'=='true')") -@ConditionalOnProperty(prefix = "mqtt.ssl", value = "enabled", havingValue = "true", matchIfMissing = false) +@ConditionalOnProperty(prefix = "transport.mqtt.ssl", value = "enabled", havingValue = "true", matchIfMissing = false) public class MqttSslHandlerProvider { - @Value("${mqtt.ssl.protocol}") + @Value("${transport.mqtt.ssl.protocol}") private String sslProtocol; - @Value("${mqtt.ssl.key_store}") + @Value("${transport.mqtt.ssl.key_store}") private String keyStoreFile; - @Value("${mqtt.ssl.key_store_password}") + @Value("${transport.mqtt.ssl.key_store_password}") private String keyStorePassword; - @Value("${mqtt.ssl.key_password}") + @Value("${transport.mqtt.ssl.key_password}") private String keyPassword; - @Value("${mqtt.ssl.key_store_type}") + @Value("${transport.mqtt.ssl.key_store_type}") private String keyStoreType; @Autowired diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java index 550837842f..eb8e5fb60f 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java @@ -36,9 +36,8 @@ public class MqttTransportServerInitializer extends ChannelInitializer