Added ability to provision credentials type from device using provision feature
This commit is contained in:
parent
5dd460b81d
commit
adf5069da1
@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.DataConstants;
|
|||||||
import org.thingsboard.server.common.data.Device;
|
import org.thingsboard.server.common.data.Device;
|
||||||
import org.thingsboard.server.common.data.DeviceProfile;
|
import org.thingsboard.server.common.data.DeviceProfile;
|
||||||
import org.thingsboard.server.common.data.audit.ActionType;
|
import org.thingsboard.server.common.data.audit.ActionType;
|
||||||
|
import org.thingsboard.server.common.data.device.credentials.BasicMqttCredentials;
|
||||||
import org.thingsboard.server.common.data.device.profile.AllowCreateNewDevicesDeviceProfileProvisionConfiguration;
|
import org.thingsboard.server.common.data.device.profile.AllowCreateNewDevicesDeviceProfileProvisionConfiguration;
|
||||||
import org.thingsboard.server.common.data.device.profile.CheckPreProvisionedDevicesDeviceProfileProvisionConfiguration;
|
import org.thingsboard.server.common.data.device.profile.CheckPreProvisionedDevicesDeviceProfileProvisionConfiguration;
|
||||||
import org.thingsboard.server.common.data.id.CustomerId;
|
import org.thingsboard.server.common.data.id.CustomerId;
|
||||||
@ -118,6 +119,13 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService {
|
|||||||
return Futures.immediateFuture(new ProvisionResponse(null, ProvisionResponseStatus.NOT_FOUND));
|
return Futures.immediateFuture(new ProvisionResponse(null, ProvisionResponseStatus.NOT_FOUND));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (provisionRequest.getCredentialsType() != null) {
|
||||||
|
ListenableFuture<ProvisionResponse> error = validateCredentials(provisionRequest);
|
||||||
|
if (error != null) {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
DeviceProfile targetProfile = deviceProfileDao.findByProvisionDeviceKey(provisionRequestKey);
|
DeviceProfile targetProfile = deviceProfileDao.findByProvisionDeviceKey(provisionRequestKey);
|
||||||
|
|
||||||
if (targetProfile == null) {
|
if (targetProfile == null) {
|
||||||
@ -152,6 +160,32 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService {
|
|||||||
return Futures.immediateFuture(new ProvisionResponse(null, ProvisionResponseStatus.NOT_FOUND));
|
return Futures.immediateFuture(new ProvisionResponse(null, ProvisionResponseStatus.NOT_FOUND));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ListenableFuture<ProvisionResponse> validateCredentials(ProvisionRequest provisionRequest) {
|
||||||
|
switch (provisionRequest.getCredentialsType()) {
|
||||||
|
case ACCESS_TOKEN:
|
||||||
|
if (StringUtils.isEmpty(provisionRequest.getCredentialsData().getToken())) {
|
||||||
|
log.error("Failed to get token from credentials data!");
|
||||||
|
return Futures.immediateFuture(new ProvisionResponse(null, ProvisionResponseStatus.FAILURE));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case MQTT_BASIC:
|
||||||
|
if (StringUtils.isEmpty(provisionRequest.getCredentialsData().getClientId()) ||
|
||||||
|
StringUtils.isEmpty(provisionRequest.getCredentialsData().getUsername()) ||
|
||||||
|
StringUtils.isEmpty(provisionRequest.getCredentialsData().getPassword())) {
|
||||||
|
log.error("Failed to get basic mqtt credentials from credentials data!");
|
||||||
|
return Futures.immediateFuture(new ProvisionResponse(null, ProvisionResponseStatus.FAILURE));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case X509_CERTIFICATE:
|
||||||
|
if (StringUtils.isEmpty(provisionRequest.getCredentialsData().getHash())) {
|
||||||
|
log.error("Failed to get hash from credentials data!");
|
||||||
|
return Futures.immediateFuture(new ProvisionResponse(null, ProvisionResponseStatus.FAILURE));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
private ListenableFuture<ProvisionResponse> processProvision(Device device, ProvisionRequest provisionRequest) {
|
private ListenableFuture<ProvisionResponse> processProvision(Device device, ProvisionRequest provisionRequest) {
|
||||||
ListenableFuture<Optional<AttributeKvEntry>> provisionStateFuture = attributesService.find(device.getTenantId(), device.getId(),
|
ListenableFuture<Optional<AttributeKvEntry>> provisionStateFuture = attributesService.find(device.getTenantId(), device.getId(),
|
||||||
DataConstants.SERVER_SCOPE, DEVICE_PROVISION_STATE);
|
DataConstants.SERVER_SCOPE, DEVICE_PROVISION_STATE);
|
||||||
@ -205,7 +239,7 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService {
|
|||||||
|
|
||||||
return Futures.transform(saveProvisionStateAttribute(savedDevice), input ->
|
return Futures.transform(saveProvisionStateAttribute(savedDevice), input ->
|
||||||
new ProvisionResponse(
|
new ProvisionResponse(
|
||||||
getDeviceCredentials(savedDevice, provisionRequest.getX509CertPubKey()),
|
getDeviceCredentials(savedDevice),
|
||||||
ProvisionResponseStatus.SUCCESS), MoreExecutors.directExecutor());
|
ProvisionResponseStatus.SUCCESS), MoreExecutors.directExecutor());
|
||||||
}
|
}
|
||||||
log.warn("[{}] The device is already provisioned!", device.getName());
|
log.warn("[{}] The device is already provisioned!", device.getName());
|
||||||
@ -224,17 +258,33 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService {
|
|||||||
device.setName(provisionRequest.getDeviceName());
|
device.setName(provisionRequest.getDeviceName());
|
||||||
device.setType(profile.getName());
|
device.setType(profile.getName());
|
||||||
device.setTenantId(profile.getTenantId());
|
device.setTenantId(profile.getTenantId());
|
||||||
return deviceService.saveDevice(device);
|
Device savedDevice = deviceService.saveDevice(device);
|
||||||
|
if (provisionRequest.getCredentialsType() != null) {
|
||||||
|
DeviceCredentials deviceCredentials = new DeviceCredentials();
|
||||||
|
deviceCredentials.setCredentialsType(provisionRequest.getCredentialsType());
|
||||||
|
switch (provisionRequest.getCredentialsType()) {
|
||||||
|
case ACCESS_TOKEN:
|
||||||
|
deviceCredentials.setDeviceId(savedDevice.getId());
|
||||||
|
deviceCredentials.setCredentialsId(provisionRequest.getCredentialsData().getToken());
|
||||||
|
break;
|
||||||
|
case MQTT_BASIC:
|
||||||
|
BasicMqttCredentials mqttCredentials = new BasicMqttCredentials();
|
||||||
|
mqttCredentials.setClientId(provisionRequest.getCredentialsData().getClientId());
|
||||||
|
mqttCredentials.setUserName(provisionRequest.getCredentialsData().getUsername());
|
||||||
|
mqttCredentials.setPassword(provisionRequest.getCredentialsData().getPassword());
|
||||||
|
deviceCredentials.setCredentialsValue(JacksonUtil.toString(mqttCredentials));
|
||||||
|
break;
|
||||||
|
case X509_CERTIFICATE:
|
||||||
|
deviceCredentials.setCredentialsValue(provisionRequest.getCredentialsData().getHash());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
deviceCredentialsService.updateDeviceCredentials(savedDevice.getTenantId(), deviceCredentials);
|
||||||
|
}
|
||||||
|
return savedDevice;
|
||||||
}
|
}
|
||||||
|
|
||||||
private DeviceCredentials getDeviceCredentials(Device device, String x509CertPubKey) {
|
private DeviceCredentials getDeviceCredentials(Device device) {
|
||||||
DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(device.getTenantId(), device.getId());
|
return deviceCredentialsService.findDeviceCredentialsByDeviceId(device.getTenantId(), device.getId());
|
||||||
if (!StringUtils.isEmpty(x509CertPubKey)) {
|
|
||||||
credentials.setCredentialsType(DeviceCredentialsType.X509_CERTIFICATE);
|
|
||||||
credentials.setCredentialsValue(x509CertPubKey);
|
|
||||||
return deviceCredentialsService.updateDeviceCredentials(device.getTenantId(), credentials);
|
|
||||||
}
|
|
||||||
return credentials;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void pushProvisionEventToRuleEngine(ProvisionRequest request, Device device, String type) {
|
private void pushProvisionEventToRuleEngine(ProvisionRequest request, Device device, String type) {
|
||||||
|
|||||||
@ -30,6 +30,7 @@ import org.thingsboard.server.common.data.Device;
|
|||||||
import org.thingsboard.server.common.data.DeviceProfile;
|
import org.thingsboard.server.common.data.DeviceProfile;
|
||||||
import org.thingsboard.server.common.data.TenantProfile;
|
import org.thingsboard.server.common.data.TenantProfile;
|
||||||
import org.thingsboard.server.common.data.device.credentials.BasicMqttCredentials;
|
import org.thingsboard.server.common.data.device.credentials.BasicMqttCredentials;
|
||||||
|
import org.thingsboard.server.common.data.device.credentials.ProvisionDeviceCredentialsData;
|
||||||
import org.thingsboard.server.common.data.device.profile.ProvisionDeviceProfileCredentials;
|
import org.thingsboard.server.common.data.device.profile.ProvisionDeviceProfileCredentials;
|
||||||
import org.thingsboard.server.common.data.id.CustomerId;
|
import org.thingsboard.server.common.data.id.CustomerId;
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
@ -281,7 +282,12 @@ public class DefaultTransportApiService implements TransportApiService {
|
|||||||
provisionResponseFuture = deviceProvisionService.provisionDevice(
|
provisionResponseFuture = deviceProvisionService.provisionDevice(
|
||||||
new ProvisionRequest(
|
new ProvisionRequest(
|
||||||
requestMsg.getDeviceName(),
|
requestMsg.getDeviceName(),
|
||||||
requestMsg.getX509CertPubKey(),
|
requestMsg.getCredentialsType() != null ? DeviceCredentialsType.valueOf(requestMsg.getCredentialsType().name()) : null,
|
||||||
|
new ProvisionDeviceCredentialsData(requestMsg.getCredentialsDataProto().getValidateDeviceTokenRequestMsg().getToken(),
|
||||||
|
requestMsg.getCredentialsDataProto().getValidateBasicMqttCredRequestMsg().getClientId(),
|
||||||
|
requestMsg.getCredentialsDataProto().getValidateBasicMqttCredRequestMsg().getUserName(),
|
||||||
|
requestMsg.getCredentialsDataProto().getValidateBasicMqttCredRequestMsg().getPassword(),
|
||||||
|
requestMsg.getCredentialsDataProto().getValidateDeviceX509CertRequestMsg().getHash()),
|
||||||
new ProvisionDeviceProfileCredentials(
|
new ProvisionDeviceProfileCredentials(
|
||||||
requestMsg.getProvisionDeviceCredentialsMsg().getProvisionDeviceKey(),
|
requestMsg.getProvisionDeviceCredentialsMsg().getProvisionDeviceKey(),
|
||||||
requestMsg.getProvisionDeviceCredentialsMsg().getProvisionDeviceSecret())));
|
requestMsg.getProvisionDeviceCredentialsMsg().getProvisionDeviceSecret())));
|
||||||
|
|||||||
@ -183,7 +183,7 @@ public abstract class AbstractMqttProvisionProtoDeviceTest extends AbstractMqttI
|
|||||||
|
|
||||||
|
|
||||||
protected byte[] createTestProvisionMessage() {
|
protected byte[] createTestProvisionMessage() {
|
||||||
return ProvisionDeviceRequestMsg.newBuilder().setX509CertPubKey("").setDeviceName("Test Provision device").setProvisionDeviceCredentialsMsg(ProvisionDeviceCredentialsMsg.newBuilder().setProvisionDeviceKey("testProvisionKey").setProvisionDeviceSecret("testProvisionSecret")).build().toByteArray();
|
return ProvisionDeviceRequestMsg.newBuilder().setDeviceName("Test Provision device").setProvisionDeviceCredentialsMsg(ProvisionDeviceCredentialsMsg.newBuilder().setProvisionDeviceKey("testProvisionKey").setProvisionDeviceSecret("testProvisionSecret")).build().toByteArray();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,12 +17,15 @@ package org.thingsboard.server.dao.device.provision;
|
|||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
import org.thingsboard.server.common.data.device.credentials.ProvisionDeviceCredentialsData;
|
||||||
import org.thingsboard.server.common.data.device.profile.ProvisionDeviceProfileCredentials;
|
import org.thingsboard.server.common.data.device.profile.ProvisionDeviceProfileCredentials;
|
||||||
|
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
public class ProvisionRequest {
|
public class ProvisionRequest {
|
||||||
private String deviceName;
|
private String deviceName;
|
||||||
private String x509CertPubKey;
|
private DeviceCredentialsType credentialsType;
|
||||||
|
private ProvisionDeviceCredentialsData credentialsData;
|
||||||
private ProvisionDeviceProfileCredentials credentials;
|
private ProvisionDeviceProfileCredentials credentials;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -79,5 +79,11 @@ public class DataConstants {
|
|||||||
public static final String DEVICE_NAME = "deviceName";
|
public static final String DEVICE_NAME = "deviceName";
|
||||||
public static final String DEVICE_TYPE = "deviceType";
|
public static final String DEVICE_TYPE = "deviceType";
|
||||||
public static final String CERT_PUB_KEY = "x509CertPubKey";
|
public static final String CERT_PUB_KEY = "x509CertPubKey";
|
||||||
|
public static final String CREDENTIALS_TYPE = "credentialsType";
|
||||||
|
public static final String TOKEN = "token";
|
||||||
|
public static final String HASH = "hash";
|
||||||
|
public static final String CLIENT_ID = "clientId";
|
||||||
|
public static final String USERNAME = "username";
|
||||||
|
public static final String PASSWORD = "password";
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,27 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2020 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.common.data.device.credentials;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public class ProvisionDeviceCredentialsData {
|
||||||
|
private final String token;
|
||||||
|
private final String clientId;
|
||||||
|
private final String username;
|
||||||
|
private final String password;
|
||||||
|
private final String hash;
|
||||||
|
}
|
||||||
@ -68,8 +68,6 @@ public class MqttTopics {
|
|||||||
public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + RPC;
|
public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + RPC;
|
||||||
public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_REQUEST;
|
public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_REQUEST;
|
||||||
public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_RESPONSE;
|
public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_RESPONSE;
|
||||||
public static final String GATEWAY_PROVISION_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + PROVISION + REQUEST;
|
|
||||||
public static final String GATEWAY_PROVISION_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + PROVISION + RESPONSE;
|
|
||||||
|
|
||||||
private MqttTopics() {
|
private MqttTopics() {
|
||||||
}
|
}
|
||||||
|
|||||||
@ -76,6 +76,7 @@ enum KeyValueType {
|
|||||||
enum CredentialsType {
|
enum CredentialsType {
|
||||||
ACCESS_TOKEN = 0;
|
ACCESS_TOKEN = 0;
|
||||||
X509_CERTIFICATE = 1;
|
X509_CERTIFICATE = 1;
|
||||||
|
BASIC_MQTT = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
message KeyValueProto {
|
message KeyValueProto {
|
||||||
@ -254,10 +255,22 @@ message DeviceCredentialsProto {
|
|||||||
string credentialsValue = 5;
|
string credentialsValue = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message CredentialsDataProto {
|
||||||
|
ValidateDeviceTokenRequestMsg validateDeviceTokenRequestMsg = 1;
|
||||||
|
ValidateDeviceX509CertRequestMsg validateDeviceX509CertRequestMsg = 2;
|
||||||
|
ValidateBasicMqttCredRequestMsg validateBasicMqttCredRequestMsg = 3;
|
||||||
|
}
|
||||||
|
|
||||||
message ProvisionDeviceRequestMsg {
|
message ProvisionDeviceRequestMsg {
|
||||||
string deviceName = 1;
|
string deviceName = 1;
|
||||||
string x509CertPubKey = 2;
|
CredentialsType credentialsType = 2;
|
||||||
ProvisionDeviceCredentialsMsg provisionDeviceCredentialsMsg = 3;
|
ProvisionDeviceCredentialsMsg provisionDeviceCredentialsMsg = 3;
|
||||||
|
CredentialsDataProto credentialsDataProto = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GatewayProvisionRequestMsg {
|
||||||
|
int32 requestId = 1;
|
||||||
|
ProvisionDeviceRequestMsg provisionDeviceRequestMsg = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
message ProvisionDeviceCredentialsMsg {
|
message ProvisionDeviceCredentialsMsg {
|
||||||
@ -270,6 +283,11 @@ message ProvisionDeviceResponseMsg {
|
|||||||
ProvisionResponseStatus provisionResponseStatus = 2;
|
ProvisionResponseStatus provisionResponseStatus = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message GatewayProvisionResponseMsg {
|
||||||
|
int32 requestId = 1;
|
||||||
|
ProvisionDeviceResponseMsg provisionDeviceResponseMsg = 2;
|
||||||
|
}
|
||||||
|
|
||||||
enum ProvisionResponseStatus {
|
enum ProvisionResponseStatus {
|
||||||
UNKNOWN = 0;
|
UNKNOWN = 0;
|
||||||
SUCCESS = 1;
|
SUCCESS = 1;
|
||||||
|
|||||||
@ -17,7 +17,6 @@ package org.thingsboard.server.transport.mqtt;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.google.gson.JsonParseException;
|
import com.google.gson.JsonParseException;
|
||||||
import com.google.gson.JsonSyntaxException;
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
|
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
|
||||||
@ -70,11 +69,10 @@ import java.io.IOException;
|
|||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.Date;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
|
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
|
||||||
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
|
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
|
||||||
@ -156,11 +154,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
if (topicName.equals(MqttTopics.DEVICE_PROVISION_REQUEST_TOPIC)) {
|
if (topicName.equals(MqttTopics.DEVICE_PROVISION_REQUEST_TOPIC)) {
|
||||||
try {
|
try {
|
||||||
TransportProtos.ProvisionDeviceRequestMsg provisionRequestMsg = deviceSessionCtx.getContext().getJsonMqttAdaptor().convertToProvisionRequestMsg(deviceSessionCtx, mqttMsg);
|
TransportProtos.ProvisionDeviceRequestMsg provisionRequestMsg = deviceSessionCtx.getContext().getJsonMqttAdaptor().convertToProvisionRequestMsg(deviceSessionCtx, mqttMsg);
|
||||||
|
validateProvisionMessage(provisionRequestMsg);
|
||||||
transportService.process(provisionRequestMsg, new DeviceProvisionCallback(ctx, msgId, provisionRequestMsg));
|
transportService.process(provisionRequestMsg, new DeviceProvisionCallback(ctx, msgId, provisionRequestMsg));
|
||||||
log.trace("[{}][{}] Processing provision publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);
|
log.trace("[{}][{}] Processing provision publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (e instanceof JsonParseException || (e.getCause() != null && e.getCause() instanceof JsonParseException)) {
|
if (e instanceof JsonParseException || (e.getCause() != null && e.getCause() instanceof JsonParseException)) {
|
||||||
TransportProtos.ProvisionDeviceRequestMsg provisionRequestMsg = deviceSessionCtx.getContext().getProtoMqttAdaptor().convertToProvisionRequestMsg(deviceSessionCtx, mqttMsg);
|
TransportProtos.ProvisionDeviceRequestMsg provisionRequestMsg = deviceSessionCtx.getContext().getProtoMqttAdaptor().convertToProvisionRequestMsg(deviceSessionCtx, mqttMsg);
|
||||||
|
validateProvisionMessage(provisionRequestMsg);
|
||||||
transportService.process(provisionRequestMsg, new DeviceProvisionCallback(ctx, msgId, provisionRequestMsg));
|
transportService.process(provisionRequestMsg, new DeviceProvisionCallback(ctx, msgId, provisionRequestMsg));
|
||||||
deviceSessionCtx.setProvisionPayloadType(TransportPayloadType.PROTOBUF);
|
deviceSessionCtx.setProvisionPayloadType(TransportPayloadType.PROTOBUF);
|
||||||
log.trace("[{}][{}] Processing provision publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);
|
log.trace("[{}][{}] Processing provision publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);
|
||||||
@ -187,6 +187,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void validateProvisionMessage(TransportProtos.ProvisionDeviceRequestMsg provisionRequestMsg) {
|
||||||
|
if (provisionRequestMsg.getProvisionDeviceCredentialsMsg().getProvisionDeviceKey() != null &&
|
||||||
|
provisionRequestMsg.getProvisionDeviceCredentialsMsg().getProvisionDeviceSecret() != null &&
|
||||||
|
provisionRequestMsg.getDeviceName() != null)
|
||||||
|
throw new RuntimeException("Wrong credentials!");
|
||||||
|
}
|
||||||
|
|
||||||
private void processRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) {
|
private void processRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) {
|
||||||
switch (msg.fixedHeader().messageType()) {
|
switch (msg.fixedHeader().messageType()) {
|
||||||
case PUBLISH:
|
case PUBLISH:
|
||||||
@ -335,9 +342,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
} else {
|
} else {
|
||||||
deviceSessionCtx.getContext().getProtoMqttAdaptor().convertToPublish(deviceSessionCtx, provisionResponseMsg).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
|
deviceSessionCtx.getContext().getProtoMqttAdaptor().convertToPublish(deviceSessionCtx, provisionResponseMsg).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
|
||||||
}
|
}
|
||||||
|
transportService.getSchedulerExecutor().schedule(() -> processDisconnect(ctx), 60, TimeUnit.SECONDS);
|
||||||
//TODO: close session with some delay.
|
|
||||||
//transportService.getScheduler().submit task with 60 seconds delay to close the session.
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
|
log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
|
||||||
}
|
}
|
||||||
@ -379,7 +384,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
|
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
|
||||||
case MqttTopics.GATEWAY_RPC_TOPIC:
|
case MqttTopics.GATEWAY_RPC_TOPIC:
|
||||||
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
|
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
|
||||||
case MqttTopics.GATEWAY_PROVISION_RESPONSE_TOPIC:
|
|
||||||
case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC:
|
case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC:
|
||||||
registerSubQoS(topic, grantedQoSList, reqQoS);
|
registerSubQoS(topic, grantedQoSList, reqQoS);
|
||||||
break;
|
break;
|
||||||
@ -447,7 +451,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
private void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
|
private void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
|
||||||
log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier());
|
log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier());
|
||||||
String userName = msg.payload().userName();
|
String userName = msg.payload().userName();
|
||||||
if (DataConstants.PROVISION.equals(userName)) {
|
String clientId = msg.payload().clientIdentifier();
|
||||||
|
if (DataConstants.PROVISION.equals(userName) || DataConstants.PROVISION.equals(clientId)) {
|
||||||
deviceSessionCtx.setProvisionOnly(true);
|
deviceSessionCtx.setProvisionOnly(true);
|
||||||
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
|
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@ -153,13 +153,6 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
|
|||||||
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC, JsonConverter.toJson(provisionResponse)));
|
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC, JsonConverter.toJson(provisionResponse)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.ProvisionDeviceResponseMsg responseMsg) {
|
|
||||||
return Optional.of(createMqttPublishMsg(ctx,
|
|
||||||
MqttTopics.GATEWAY_PROVISION_RESPONSE_TOPIC,
|
|
||||||
JsonConverter.toGatewayJson(deviceName, responseMsg)));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static JsonElement validateJsonPayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException {
|
public static JsonElement validateJsonPayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException {
|
||||||
String payload = validatePayload(sessionId, payloadData, false);
|
String payload = validatePayload(sessionId, payloadData, false);
|
||||||
try {
|
try {
|
||||||
|
|||||||
@ -69,6 +69,4 @@ public interface MqttTransportAdaptor {
|
|||||||
|
|
||||||
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ProvisionDeviceResponseMsg provisionResponse) throws AdaptorException;
|
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ProvisionDeviceResponseMsg provisionResponse) throws AdaptorException;
|
||||||
|
|
||||||
Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, ProvisionDeviceResponseMsg provisionResponse) throws AdaptorException;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -187,15 +187,6 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
|
|||||||
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_RPC_TOPIC, payloadBytes));
|
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_RPC_TOPIC, payloadBytes));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.ProvisionDeviceResponseMsg responseMsg) throws AdaptorException {
|
|
||||||
TransportApiProtos.GatewayProvisionResponseMsg.Builder builder = TransportApiProtos.GatewayProvisionResponseMsg.newBuilder();
|
|
||||||
builder.setDeviceName(deviceName);
|
|
||||||
builder.setProvisionDeviceResponseMsg(responseMsg);
|
|
||||||
byte[] payloadBytes = builder.build().toByteArray();
|
|
||||||
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_PROVISION_RESPONSE_TOPIC, payloadBytes));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static byte[] toBytes(ByteBuf inbound) {
|
public static byte[] toBytes(ByteBuf inbound) {
|
||||||
byte[] bytes = new byte[inbound.readableBytes()];
|
byte[] bytes = new byte[inbound.readableBytes()];
|
||||||
int readerIndex = inbound.readerIndex();
|
int readerIndex = inbound.readerIndex();
|
||||||
|
|||||||
@ -40,6 +40,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateBasicMqttCre
|
|||||||
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
|
||||||
|
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by ashvayka on 04.10.18.
|
* Created by ashvayka on 04.10.18.
|
||||||
*/
|
*/
|
||||||
@ -88,6 +90,8 @@ public interface TransportService {
|
|||||||
|
|
||||||
void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback<Void> callback);
|
void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback<Void> callback);
|
||||||
|
|
||||||
|
ScheduledExecutorService getSchedulerExecutor();
|
||||||
|
|
||||||
void registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
|
void registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
|
||||||
|
|
||||||
void registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout);
|
void registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout);
|
||||||
|
|||||||
@ -37,6 +37,8 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
|
|||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.CredentialsDataProto.Builder;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.CredentialsType;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
|
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
|
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
|
||||||
@ -46,7 +48,11 @@ import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRespo
|
|||||||
import org.thingsboard.server.gen.transport.TransportProtos.ProvisionResponseStatus;
|
import org.thingsboard.server.gen.transport.TransportProtos.ProvisionResponseStatus;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.TsKvListProto;
|
import org.thingsboard.server.gen.transport.TransportProtos.TsKvListProto;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
|
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.ValidateBasicMqttCredRequestMsg;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
|
||||||
|
|
||||||
|
import javax.xml.crypto.Data;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
@ -554,7 +560,17 @@ public class JsonConverter {
|
|||||||
private static TransportProtos.ProvisionDeviceRequestMsg buildProvisionRequestMsg(JsonObject jo) {
|
private static TransportProtos.ProvisionDeviceRequestMsg buildProvisionRequestMsg(JsonObject jo) {
|
||||||
return TransportProtos.ProvisionDeviceRequestMsg.newBuilder()
|
return TransportProtos.ProvisionDeviceRequestMsg.newBuilder()
|
||||||
.setDeviceName(getStrValue(jo, DataConstants.DEVICE_NAME, true))
|
.setDeviceName(getStrValue(jo, DataConstants.DEVICE_NAME, true))
|
||||||
.setX509CertPubKey(getStrValue(jo, DataConstants.CERT_PUB_KEY, false))
|
.setCredentialsType(TransportProtos.CredentialsType.valueOf(getStrValue(jo, DataConstants.CREDENTIALS_TYPE, false)))
|
||||||
|
.setCredentialsDataProto(TransportProtos.CredentialsDataProto.newBuilder()
|
||||||
|
.setValidateDeviceTokenRequestMsg(ValidateDeviceTokenRequestMsg.newBuilder().setToken(getStrValue(jo, DataConstants.TOKEN, false)).build())
|
||||||
|
.setValidateBasicMqttCredRequestMsg(ValidateBasicMqttCredRequestMsg.newBuilder()
|
||||||
|
.setClientId(getStrValue(jo, DataConstants.CLIENT_ID, false))
|
||||||
|
.setUserName(getStrValue(jo, DataConstants.USERNAME, false))
|
||||||
|
.setPassword(getStrValue(jo, DataConstants.PASSWORD, false))
|
||||||
|
.build())
|
||||||
|
.setValidateDeviceX509CertRequestMsg(ValidateDeviceX509CertRequestMsg.newBuilder()
|
||||||
|
.setHash(getStrValue(jo, DataConstants.PASSWORD, false)).build())
|
||||||
|
.build())
|
||||||
.setProvisionDeviceCredentialsMsg(buildProvisionDeviceCredentialsMsg(
|
.setProvisionDeviceCredentialsMsg(buildProvisionDeviceCredentialsMsg(
|
||||||
getStrValue(jo, DataConstants.PROVISION_KEY, true),
|
getStrValue(jo, DataConstants.PROVISION_KEY, true),
|
||||||
getStrValue(jo, DataConstants.PROVISION_SECRET, true)))
|
getStrValue(jo, DataConstants.PROVISION_SECRET, true)))
|
||||||
@ -567,6 +583,8 @@ public class JsonConverter {
|
|||||||
.setProvisionDeviceSecret(provisionSecret)
|
.setProvisionDeviceSecret(provisionSecret)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static String getStrValue(JsonObject jo, String field, boolean requiredField) {
|
private static String getStrValue(JsonObject jo, String field, boolean requiredField) {
|
||||||
if (jo.has(field)) {
|
if (jo.has(field)) {
|
||||||
return jo.get(field).getAsString();
|
return jo.get(field).getAsString();
|
||||||
|
|||||||
@ -34,7 +34,6 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
|
|||||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.common.msg.TbMsgDataType;
|
|
||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
import org.thingsboard.server.common.msg.queue.ServiceQueue;
|
import org.thingsboard.server.common.msg.queue.ServiceQueue;
|
||||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
@ -49,7 +48,6 @@ import org.thingsboard.server.common.transport.TransportServiceCallback;
|
|||||||
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
|
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
|
||||||
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
|
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
|
||||||
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
|
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
|
||||||
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
|
|
||||||
import org.thingsboard.server.common.transport.util.JsonUtils;
|
import org.thingsboard.server.common.transport.util.JsonUtils;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg;
|
||||||
@ -77,11 +75,9 @@ import org.thingsboard.server.common.stats.StatsType;
|
|||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import javax.annotation.PreDestroy;
|
import javax.annotation.PreDestroy;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
@ -93,7 +89,6 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.Function;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by ashvayka on 17.10.18.
|
* Created by ashvayka on 17.10.18.
|
||||||
@ -236,6 +231,11 @@ public class DefaultTransportService implements TransportService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScheduledExecutorService getSchedulerExecutor(){
|
||||||
|
return this.schedulerExecutor;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) {
|
public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) {
|
||||||
sessions.putIfAbsent(toSessionId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener));
|
sessions.putIfAbsent(toSessionId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener));
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user