Transport Refactoring to fetch Device Profile with Credentials

This commit is contained in:
Andrii Shvaika 2020-09-03 18:20:49 +03:00
parent 1e87c72818
commit 01ec9f048d
14 changed files with 280 additions and 95 deletions

View File

@ -32,6 +32,8 @@ import org.thingsboard.server.common.transport.TransportContext;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.lang.reflect.Field;
@ -295,7 +297,7 @@ public class CoapTransportResource extends CoapResource {
return this;
}
private static class DeviceAuthCallback implements TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> {
private static class DeviceAuthCallback implements TransportServiceCallback<ValidateDeviceCredentialsResponse> {
private final TransportContext transportContext;
private final CoapExchange exchange;
private final Consumer<TransportProtos.SessionInfoProto> onSuccess;
@ -307,22 +309,9 @@ public class CoapTransportResource extends CoapResource {
}
@Override
public void onSuccess(TransportProtos.ValidateDeviceCredentialsResponseMsg msg) {
public void onSuccess(ValidateDeviceCredentialsResponse msg) {
if (msg.hasDeviceInfo()) {
UUID sessionId = UUID.randomUUID();
TransportProtos.DeviceInfoProto deviceInfoProto = msg.getDeviceInfo();
TransportProtos.SessionInfoProto sessionInfo = TransportProtos.SessionInfoProto.newBuilder()
.setNodeId(transportContext.getNodeId())
.setTenantIdMSB(deviceInfoProto.getTenantIdMSB())
.setTenantIdLSB(deviceInfoProto.getTenantIdLSB())
.setDeviceIdMSB(deviceInfoProto.getDeviceIdMSB())
.setDeviceIdLSB(deviceInfoProto.getDeviceIdLSB())
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setDeviceName(msg.getDeviceInfo().getDeviceName())
.setDeviceType(msg.getDeviceInfo().getDeviceType())
.build();
onSuccess.accept(sessionInfo);
onSuccess.accept(SessionInfoCreator.create(msg, transportContext, UUID.randomUUID()));
} else {
exchange.respond(ResponseCode.UNAUTHORIZED);
}

View File

@ -36,6 +36,8 @@ import org.thingsboard.server.common.transport.TransportContext;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
@ -200,7 +202,7 @@ public class DeviceApiController {
return responseWriter;
}
private static class DeviceAuthCallback implements TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> {
private static class DeviceAuthCallback implements TransportServiceCallback<ValidateDeviceCredentialsResponse> {
private final TransportContext transportContext;
private final DeferredResult<ResponseEntity> responseWriter;
private final Consumer<SessionInfoProto> onSuccess;
@ -212,22 +214,9 @@ public class DeviceApiController {
}
@Override
public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
public void onSuccess(ValidateDeviceCredentialsResponse msg) {
if (msg.hasDeviceInfo()) {
UUID sessionId = UUID.randomUUID();
DeviceInfoProto deviceInfoProto = msg.getDeviceInfo();
SessionInfoProto sessionInfo = SessionInfoProto.newBuilder()
.setNodeId(transportContext.getNodeId())
.setTenantIdMSB(deviceInfoProto.getTenantIdMSB())
.setTenantIdLSB(deviceInfoProto.getTenantIdLSB())
.setDeviceIdMSB(deviceInfoProto.getDeviceIdMSB())
.setDeviceIdLSB(deviceInfoProto.getDeviceIdLSB())
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setDeviceName(msg.getDeviceInfo().getDeviceName())
.setDeviceType(msg.getDeviceInfo().getDeviceType())
.build();
onSuccess.accept(sessionInfo);
onSuccess.accept(SessionInfoCreator.create(msg, transportContext, UUID.randomUUID()));
} else {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
}

View File

@ -27,6 +27,7 @@ import org.springframework.util.StringUtils;
import org.thingsboard.server.common.msg.EncryptionUtil;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.mqtt.util.SslUtil;
@ -157,11 +158,11 @@ public class MqttSslHandlerProvider {
final String[] credentialsBodyHolder = new String[1];
CountDownLatch latch = new CountDownLatch(1);
transportService.process(TransportProtos.ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(),
new TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg>() {
new TransportServiceCallback<ValidateDeviceCredentialsResponse>() {
@Override
public void onSuccess(TransportProtos.ValidateDeviceCredentialsResponseMsg msg) {
if (!StringUtils.isEmpty(msg.getCredentialsBody())) {
credentialsBodyHolder[0] = msg.getCredentialsBody();
public void onSuccess(ValidateDeviceCredentialsResponse msg) {
if (!StringUtils.isEmpty(msg.getCredentials())) {
credentialsBodyHolder[0] = msg.getCredentials();
}
latch.countDown();
}

View File

@ -41,9 +41,13 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.msg.EncryptionUtil;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportContext;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.common.transport.service.DefaultTransportService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
@ -365,9 +369,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
ctx.close();
} else {
transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
new TransportServiceCallback<ValidateDeviceCredentialsResponse>() {
@Override
public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
public void onSuccess(ValidateDeviceCredentialsResponse msg) {
onValidateDeviceResponse(msg, ctx);
}
@ -386,9 +390,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
String strCert = SslUtil.getX509CertificateString(cert);
String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
transportService.process(ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(),
new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
new TransportServiceCallback<ValidateDeviceCredentialsResponse>() {
@Override
public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
public void onSuccess(ValidateDeviceCredentialsResponse msg) {
onValidateDeviceResponse(msg, ctx);
}
@ -474,7 +478,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private void checkGatewaySession() {
DeviceInfoProto device = deviceSessionCtx.getDeviceInfo();
TransportDeviceInfo device = deviceSessionCtx.getDeviceInfo();
try {
JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());
if (infoNode != null) {
@ -504,25 +508,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg msg, ChannelHandlerContext ctx) {
private void onValidateDeviceResponse(ValidateDeviceCredentialsResponse msg, ChannelHandlerContext ctx) {
if (!msg.hasDeviceInfo()) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
ctx.close();
} else {
deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
sessionInfo = SessionInfoProto.newBuilder()
.setNodeId(context.getNodeId())
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setDeviceIdMSB(msg.getDeviceInfo().getDeviceIdMSB())
.setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB())
.setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB())
.setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
.setDeviceName(msg.getDeviceInfo().getDeviceName())
.setDeviceType(msg.getDeviceInfo().getDeviceType())
.setDeviceProfileIdMSB(msg.getDeviceInfo().getDeviceProfileIdMSB())
.setDeviceProfileIdLSB(msg.getDeviceInfo().getDeviceProfileIdLSB())
.build();
sessionInfo = SessionInfoCreator.create(msg, context, sessionId);
transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() {
@Override
public void onSuccess(Void msg) {

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt.session;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
@ -33,21 +34,23 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
private final GatewaySessionHandler parent;
private final SessionInfoProto sessionInfo;
public GatewayDeviceSessionCtx(GatewaySessionHandler parent, DeviceInfoProto deviceInfo, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
public GatewayDeviceSessionCtx(GatewaySessionHandler parent, TransportDeviceInfo deviceInfo, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
super(UUID.randomUUID(), mqttQoSMap);
this.parent = parent;
this.sessionInfo = SessionInfoProto.newBuilder()
.setNodeId(parent.getNodeId())
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setDeviceIdMSB(deviceInfo.getDeviceIdMSB())
.setDeviceIdLSB(deviceInfo.getDeviceIdLSB())
.setTenantIdMSB(deviceInfo.getTenantIdMSB())
.setTenantIdLSB(deviceInfo.getTenantIdLSB())
.setDeviceIdMSB(deviceInfo.getDeviceId().getId().getMostSignificantBits())
.setDeviceIdLSB(deviceInfo.getDeviceId().getId().getLeastSignificantBits())
.setTenantIdMSB(deviceInfo.getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(deviceInfo.getTenantId().getId().getLeastSignificantBits())
.setDeviceName(deviceInfo.getDeviceName())
.setDeviceType(deviceInfo.getDeviceType())
.setGwSessionIdMSB(parent.getSessionId().getMostSignificantBits())
.setGwSessionIdLSB(parent.getSessionId().getLeastSignificantBits())
.setDeviceProfileIdMSB(deviceInfo.getDeviceProfileId().getId().getMostSignificantBits())
.setDeviceProfileIdLSB(deviceInfo.getDeviceProfileId().getId().getLeastSignificantBits())
.build();
setDeviceInfo(deviceInfo);
}

View File

@ -35,6 +35,8 @@ import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.common.transport.service.DefaultTransportService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
@ -69,7 +71,7 @@ public class GatewaySessionHandler {
private final MqttTransportContext context;
private final TransportService transportService;
private final DeviceInfoProto gateway;
private final TransportDeviceInfo gateway;
private final UUID sessionId;
private final ConcurrentMap<String, Lock> deviceCreationLockMap;
private final ConcurrentMap<String, GatewayDeviceSessionCtx> devices;
@ -140,11 +142,11 @@ public class GatewaySessionHandler {
transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
.setDeviceName(deviceName)
.setDeviceType(deviceType)
.setGatewayIdMSB(gateway.getDeviceIdMSB())
.setGatewayIdLSB(gateway.getDeviceIdLSB()).build(),
new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg>() {
.setGatewayIdMSB(gateway.getDeviceId().getId().getMostSignificantBits())
.setGatewayIdLSB(gateway.getDeviceId().getId().getLeastSignificantBits()).build(),
new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse>() {
@Override
public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) {
public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap);
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
@ -218,8 +220,7 @@ public class GatewaySessionHandler {
TransportProtos.PostTelemetryMsg postTelemetryMsg = JsonConverter.convertToTelemetryProto(deviceEntry.getValue().getAsJsonArray());
transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg));
} catch (Throwable e) {
UUID gatewayId = new UUID(gateway.getDeviceIdMSB(), gateway.getDeviceIdLSB());
log.warn("[{}][{}] Failed to convert telemetry: {}", gatewayId, deviceName, deviceEntry.getValue(), e);
log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, deviceEntry.getValue(), e);
}
}
@ -253,8 +254,7 @@ public class GatewaySessionHandler {
TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(deviceId, deviceEntry.getValue());
transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg));
} catch (Throwable e) {
UUID gatewayId = new UUID(gateway.getDeviceIdMSB(), gateway.getDeviceIdLSB());
log.warn("[{}][{}] Failed to convert claim message: {}", gatewayId, deviceName, deviceEntry.getValue(), e);
log.warn("[{}][{}] Failed to convert claim message: {}", gateway.getDeviceId(), deviceName, deviceEntry.getValue(), e);
}
}

View File

@ -17,6 +17,8 @@ package org.thingsboard.server.common.transport;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
@ -44,13 +46,13 @@ public interface TransportService {
GetTenantRoutingInfoResponseMsg getRoutingInfo(GetTenantRoutingInfoRequestMsg msg);
void process(ValidateDeviceTokenRequestMsg msg,
TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback);
TransportServiceCallback<ValidateDeviceCredentialsResponse> callback);
void process(ValidateDeviceX509CertRequestMsg msg,
TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback);
TransportServiceCallback<ValidateDeviceCredentialsResponse> callback);
void process(GetOrCreateDeviceFromGatewayRequestMsg msg,
TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg> callback);
TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse> callback);
void getDeviceProfile(DeviceProfileId deviceProfileId, TransportServiceCallback<DeviceProfile> callback);

View File

@ -0,0 +1,24 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.auth;
import org.thingsboard.server.common.data.DeviceProfile;
public interface DeviceProfileAware {
DeviceProfile getDeviceProfile();
}

View File

@ -0,0 +1,29 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.auth;
import lombok.Builder;
import lombok.Data;
import org.thingsboard.server.common.data.DeviceProfile;
@Data
@Builder
public class GetOrCreateDeviceFromGatewayResponse implements DeviceProfileAware {
private TransportDeviceInfo deviceInfo;
private DeviceProfile deviceProfile;
}

View File

@ -0,0 +1,41 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.auth;
import org.thingsboard.server.common.transport.TransportContext;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.UUID;
public class SessionInfoCreator {
public static TransportProtos.SessionInfoProto create(ValidateDeviceCredentialsResponse msg, TransportContext context, UUID sessionId) {
return TransportProtos.SessionInfoProto.newBuilder()
.setNodeId(context.getNodeId())
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setDeviceIdMSB(msg.getDeviceInfo().getDeviceId().getId().getMostSignificantBits())
.setDeviceIdLSB(msg.getDeviceInfo().getDeviceId().getId().getLeastSignificantBits())
.setTenantIdMSB(msg.getDeviceInfo().getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(msg.getDeviceInfo().getTenantId().getId().getLeastSignificantBits())
.setDeviceName(msg.getDeviceInfo().getDeviceName())
.setDeviceType(msg.getDeviceInfo().getDeviceType())
.setDeviceProfileIdMSB(msg.getDeviceInfo().getDeviceProfileId().getId().getMostSignificantBits())
.setDeviceProfileIdLSB(msg.getDeviceInfo().getDeviceProfileId().getId().getLeastSignificantBits())
.build();
}
}

View File

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.auth;
import lombok.Data;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.TenantId;
@Data
public class TransportDeviceInfo {
private TenantId tenantId;
private DeviceProfileId deviceProfileId;
private DeviceId deviceId;
private String deviceName;
private String deviceType;
private String additionalInfo;
}

View File

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.auth;
import lombok.Builder;
import lombok.Data;
import org.thingsboard.server.common.data.DeviceProfile;
@Data
@Builder
public class ValidateDeviceCredentialsResponse implements DeviceProfileAware {
private final TransportDeviceInfo deviceInfo;
private final DeviceProfile deviceProfile;
private final String credentials;
public boolean hasDeviceInfo() {
return deviceInfo != null;
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.common.transport.service;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.protobuf.ByteString;
@ -43,6 +44,9 @@ import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
import org.thingsboard.server.common.transport.util.JsonUtils;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -248,38 +252,82 @@ public class DefaultTransportService implements TransportService {
}
@Override
public void process(TransportProtos.ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback) {
public void process(TransportProtos.ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponse> callback) {
log.trace("Processing msg: {}", msg);
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build());
process(callback, protoMsg);
doProcess(protoMsg, callback);
}
@Override
public void process(TransportProtos.ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback) {
public void process(TransportProtos.ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponse> callback) {
log.trace("Processing msg: {}", msg);
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build());
process(callback, protoMsg);
doProcess(protoMsg, callback);
}
private void process(TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback, TbProtoQueueMsg<TransportApiRequestMsg> protoMsg) {
ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> result = extractProfile(transportApiRequestTemplate.send(protoMsg),
response -> response.getValidateTokenResponseMsg().hasDeviceInfo(),
response -> response.getValidateTokenResponseMsg().getDeviceInfo(),
response -> response.getValidateTokenResponseMsg().getProfileBody());
AsyncCallbackTemplate.withCallback(result,
response -> callback.onSuccess(response.getValue().getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
private void doProcess(TbProtoQueueMsg<TransportApiRequestMsg> protoMsg, TransportServiceCallback<ValidateDeviceCredentialsResponse> callback) {
ListenableFuture<ValidateDeviceCredentialsResponse> response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> {
TransportProtos.ValidateDeviceCredentialsResponseMsg msg = tmp.getValue().getValidateTokenResponseMsg();
ValidateDeviceCredentialsResponse.ValidateDeviceCredentialsResponseBuilder result = ValidateDeviceCredentialsResponse.builder();
if (msg.hasDeviceInfo()) {
result.credentials(msg.getCredentialsBody());
TransportDeviceInfo tdi = getTransportDeviceInfo(msg.getDeviceInfo());
result.deviceInfo(tdi);
ByteString profileBody = msg.getProfileBody();
if (profileBody != null && !profileBody.isEmpty()) {
DeviceProfile profile = deviceProfiles.get(tdi.getDeviceProfileId());
if (profile == null) {
Optional<DeviceProfile> deviceProfile = dataDecodingEncodingService.decode(profileBody.toByteArray());
if (deviceProfile.isPresent()) {
profile = deviceProfile.get();
deviceProfiles.put(tdi.getDeviceProfileId(), profile);
}
}
result.deviceProfile(profile);
}
}
return result.build();
}, MoreExecutors.directExecutor());
AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor);
}
@Override
public void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg msg, TransportServiceCallback<TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg> callback) {
log.trace("Processing msg: {}", msg);
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(msg).build());
ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> result = extractProfile(transportApiRequestTemplate.send(protoMsg),
response -> response.getGetOrCreateDeviceResponseMsg().hasDeviceInfo(),
response -> response.getGetOrCreateDeviceResponseMsg().getDeviceInfo(),
response -> response.getGetOrCreateDeviceResponseMsg().getProfileBody());
AsyncCallbackTemplate.withCallback(result,
response -> callback.onSuccess(response.getValue().getGetOrCreateDeviceResponseMsg()), callback::onError, transportCallbackExecutor);
public void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg requestMsg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse> callback) {
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(requestMsg).build());
log.trace("Processing msg: {}", requestMsg);
ListenableFuture<GetOrCreateDeviceFromGatewayResponse> response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> {
TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg msg = tmp.getValue().getGetOrCreateDeviceResponseMsg();
GetOrCreateDeviceFromGatewayResponse.GetOrCreateDeviceFromGatewayResponseBuilder result = GetOrCreateDeviceFromGatewayResponse.builder();
if (msg.hasDeviceInfo()) {
TransportDeviceInfo tdi = getTransportDeviceInfo(msg.getDeviceInfo());
result.deviceInfo(tdi);
ByteString profileBody = msg.getProfileBody();
if (profileBody != null && !profileBody.isEmpty()) {
DeviceProfile profile = deviceProfiles.get(tdi.getDeviceProfileId());
if (profile == null) {
Optional<DeviceProfile> deviceProfile = dataDecodingEncodingService.decode(profileBody.toByteArray());
if (deviceProfile.isPresent()) {
profile = deviceProfile.get();
deviceProfiles.put(tdi.getDeviceProfileId(), profile);
}
}
result.deviceProfile(profile);
}
}
return result.build();
}, MoreExecutors.directExecutor());
AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor);
}
private TransportDeviceInfo getTransportDeviceInfo(TransportProtos.DeviceInfoProto di) {
TransportDeviceInfo tdi = new TransportDeviceInfo();
tdi.setTenantId(new TenantId(new UUID(di.getTenantIdMSB(), di.getTenantIdLSB())));
tdi.setDeviceId(new DeviceId(new UUID(di.getDeviceIdMSB(), di.getDeviceIdLSB())));
tdi.setDeviceProfileId(new DeviceProfileId(new UUID(di.getDeviceProfileIdMSB(), di.getDeviceProfileIdLSB())));
tdi.setAdditionalInfo(di.getAdditionalInfo());
tdi.setDeviceName(di.getDeviceName());
tdi.setDeviceType(di.getDeviceType());
return tdi;
}
@Override

View File

@ -19,6 +19,7 @@ import lombok.Data;
import lombok.Getter;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.msg.session.SessionContext;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
import java.util.UUID;
@ -34,17 +35,17 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
@Getter
private volatile DeviceId deviceId;
@Getter
private volatile DeviceInfoProto deviceInfo;
private volatile TransportDeviceInfo deviceInfo;
private volatile boolean connected;
public DeviceId getDeviceId() {
return deviceId;
}
public void setDeviceInfo(DeviceInfoProto deviceInfo) {
public void setDeviceInfo(TransportDeviceInfo deviceInfo) {
this.deviceInfo = deviceInfo;
this.connected = true;
this.deviceId = new DeviceId(new UUID(deviceInfo.getDeviceIdMSB(), deviceInfo.getDeviceIdLSB()));
this.deviceId = deviceInfo.getDeviceId();
}
public boolean isConnected() {