Async transport api requests processing
This commit is contained in:
parent
c4e5ab65ee
commit
37ad8a79b1
@ -18,7 +18,6 @@ package org.thingsboard.server.service.transport;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
@ -101,13 +100,11 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509Ce
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
@ -136,7 +133,6 @@ public class DefaultTransportApiService implements TransportApiService {
|
||||
private final DeviceProfileService deviceProfileService;
|
||||
private final RelationService relationService;
|
||||
private final DeviceCredentialsService deviceCredentialsService;
|
||||
private final DbCallbackExecutorService dbCallbackExecutorService;
|
||||
private final TbClusterService tbClusterService;
|
||||
private final DeviceProvisionService deviceProvisionService;
|
||||
private final ResourceService resourceService;
|
||||
@ -170,52 +166,54 @@ public class DefaultTransportApiService implements TransportApiService {
|
||||
@Override
|
||||
public ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> handle(TbProtoQueueMsg<TransportApiRequestMsg> tbProtoQueueMsg) {
|
||||
TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue();
|
||||
ListenableFuture<TransportApiResponseMsg> result = null;
|
||||
return handlerExecutor.submit(() -> {
|
||||
TransportApiResponseMsg result = handle(transportApiRequestMsg);
|
||||
return new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), result, tbProtoQueueMsg.getHeaders());
|
||||
});
|
||||
}
|
||||
|
||||
private TransportApiResponseMsg handle(TransportApiRequestMsg transportApiRequestMsg) {
|
||||
if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {
|
||||
ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();
|
||||
final String token = msg.getToken();
|
||||
result = handlerExecutor.submit(() -> validateCredentials(token, DeviceCredentialsType.ACCESS_TOKEN));
|
||||
return validateCredentials(token, DeviceCredentialsType.ACCESS_TOKEN);
|
||||
} else if (transportApiRequestMsg.hasValidateBasicMqttCredRequestMsg()) {
|
||||
TransportProtos.ValidateBasicMqttCredRequestMsg msg = transportApiRequestMsg.getValidateBasicMqttCredRequestMsg();
|
||||
result = handlerExecutor.submit(() -> validateCredentials(msg));
|
||||
return validateCredentials(msg);
|
||||
} else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) {
|
||||
ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg();
|
||||
final String hash = msg.getHash();
|
||||
result = handlerExecutor.submit(() -> validateCredentials(hash, DeviceCredentialsType.X509_CERTIFICATE));
|
||||
return validateCredentials(hash, DeviceCredentialsType.X509_CERTIFICATE);
|
||||
} else if (transportApiRequestMsg.hasValidateOrCreateX509CertRequestMsg()) {
|
||||
TransportProtos.ValidateOrCreateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateOrCreateX509CertRequestMsg();
|
||||
final String certChain = msg.getCertificateChain();
|
||||
result = handlerExecutor.submit(() -> validateOrCreateDeviceX509Certificate(certChain));
|
||||
return validateOrCreateDeviceX509Certificate(certChain);
|
||||
} else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) {
|
||||
result = handlerExecutor.submit(() -> handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()));
|
||||
return handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg());
|
||||
} else if (transportApiRequestMsg.hasEntityProfileRequestMsg()) {
|
||||
result = handle(transportApiRequestMsg.getEntityProfileRequestMsg());
|
||||
return handle(transportApiRequestMsg.getEntityProfileRequestMsg());
|
||||
} else if (transportApiRequestMsg.hasLwM2MRequestMsg()) {
|
||||
result = handle(transportApiRequestMsg.getLwM2MRequestMsg());
|
||||
return handle(transportApiRequestMsg.getLwM2MRequestMsg());
|
||||
} else if (transportApiRequestMsg.hasValidateDeviceLwM2MCredentialsRequestMsg()) {
|
||||
ValidateDeviceLwM2MCredentialsRequestMsg msg = transportApiRequestMsg.getValidateDeviceLwM2MCredentialsRequestMsg();
|
||||
final String credentialsId = msg.getCredentialsId();
|
||||
result = handlerExecutor.submit(() -> validateCredentials(credentialsId, DeviceCredentialsType.LWM2M_CREDENTIALS));
|
||||
return validateCredentials(credentialsId, DeviceCredentialsType.LWM2M_CREDENTIALS);
|
||||
} else if (transportApiRequestMsg.hasProvisionDeviceRequestMsg()) {
|
||||
result = handle(transportApiRequestMsg.getProvisionDeviceRequestMsg());
|
||||
return handle(transportApiRequestMsg.getProvisionDeviceRequestMsg());
|
||||
} else if (transportApiRequestMsg.hasResourceRequestMsg()) {
|
||||
result = handle(transportApiRequestMsg.getResourceRequestMsg());
|
||||
return handle(transportApiRequestMsg.getResourceRequestMsg());
|
||||
} else if (transportApiRequestMsg.hasSnmpDevicesRequestMsg()) {
|
||||
result = handle(transportApiRequestMsg.getSnmpDevicesRequestMsg());
|
||||
return handle(transportApiRequestMsg.getSnmpDevicesRequestMsg());
|
||||
} else if (transportApiRequestMsg.hasDeviceRequestMsg()) {
|
||||
result = handle(transportApiRequestMsg.getDeviceRequestMsg());
|
||||
return handle(transportApiRequestMsg.getDeviceRequestMsg());
|
||||
} else if (transportApiRequestMsg.hasDeviceCredentialsRequestMsg()) {
|
||||
result = handle(transportApiRequestMsg.getDeviceCredentialsRequestMsg());
|
||||
return handle(transportApiRequestMsg.getDeviceCredentialsRequestMsg());
|
||||
} else if (transportApiRequestMsg.hasOtaPackageRequestMsg()) {
|
||||
result = handle(transportApiRequestMsg.getOtaPackageRequestMsg());
|
||||
return handle(transportApiRequestMsg.getOtaPackageRequestMsg());
|
||||
} else if (transportApiRequestMsg.hasGetAllQueueRoutingInfoRequestMsg()) {
|
||||
return Futures.transform(handle(transportApiRequestMsg.getGetAllQueueRoutingInfoRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor());
|
||||
return handle(transportApiRequestMsg.getGetAllQueueRoutingInfoRequestMsg());
|
||||
}
|
||||
|
||||
return Futures.transform(Optional.ofNullable(result).orElseGet(this::getEmptyTransportApiResponseFuture),
|
||||
value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()),
|
||||
MoreExecutors.directExecutor());
|
||||
return getEmptyTransportApiResponse();
|
||||
}
|
||||
|
||||
private TransportApiResponseMsg validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {
|
||||
@ -405,10 +403,10 @@ public class DefaultTransportApiService implements TransportApiService {
|
||||
}
|
||||
}
|
||||
|
||||
private ListenableFuture<TransportApiResponseMsg> handle(ProvisionDeviceRequestMsg requestMsg) {
|
||||
ListenableFuture<ProvisionResponse> provisionResponseFuture;
|
||||
private TransportApiResponseMsg handle(ProvisionDeviceRequestMsg requestMsg) {
|
||||
ProvisionResponse provisionResponse;
|
||||
try {
|
||||
provisionResponseFuture = Futures.immediateFuture(deviceProvisionService.provisionDevice(
|
||||
provisionResponse = deviceProvisionService.provisionDevice(
|
||||
new ProvisionRequest(
|
||||
requestMsg.getDeviceName(),
|
||||
requestMsg.getCredentialsType() != null ? DeviceCredentialsType.valueOf(requestMsg.getCredentialsType().name()) : null,
|
||||
@ -419,18 +417,14 @@ public class DefaultTransportApiService implements TransportApiService {
|
||||
requestMsg.getCredentialsDataProto().getValidateDeviceX509CertRequestMsg().getHash()),
|
||||
new ProvisionDeviceProfileCredentials(
|
||||
requestMsg.getProvisionDeviceCredentialsMsg().getProvisionDeviceKey(),
|
||||
requestMsg.getProvisionDeviceCredentialsMsg().getProvisionDeviceSecret()))));
|
||||
requestMsg.getProvisionDeviceCredentialsMsg().getProvisionDeviceSecret())));
|
||||
} catch (ProvisionFailedException e) {
|
||||
return Futures.immediateFuture(getTransportApiResponseMsg(
|
||||
new DeviceCredentials(),
|
||||
TransportProtos.ResponseStatus.valueOf(e.getMessage())));
|
||||
return getTransportApiResponseMsg(new DeviceCredentials(), TransportProtos.ResponseStatus.valueOf(e.getMessage()));
|
||||
}
|
||||
return Futures.transform(provisionResponseFuture, provisionResponse -> getTransportApiResponseMsg(provisionResponse.getDeviceCredentials(), TransportProtos.ResponseStatus.SUCCESS),
|
||||
dbCallbackExecutorService);
|
||||
return getTransportApiResponseMsg(provisionResponse.getDeviceCredentials(), TransportProtos.ResponseStatus.SUCCESS);
|
||||
}
|
||||
|
||||
private TransportApiResponseMsg getTransportApiResponseMsg(DeviceCredentials
|
||||
deviceCredentials, TransportProtos.ResponseStatus status) {
|
||||
private TransportApiResponseMsg getTransportApiResponseMsg(DeviceCredentials deviceCredentials, TransportProtos.ResponseStatus status) {
|
||||
if (!status.equals(TransportProtos.ResponseStatus.SUCCESS)) {
|
||||
return TransportApiResponseMsg.newBuilder().setProvisionDeviceResponseMsg(TransportProtos.ProvisionDeviceResponseMsg.newBuilder().setStatus(status).build()).build();
|
||||
}
|
||||
@ -453,7 +447,7 @@ public class DefaultTransportApiService implements TransportApiService {
|
||||
.build();
|
||||
}
|
||||
|
||||
private ListenableFuture<TransportApiResponseMsg> handle(GetEntityProfileRequestMsg requestMsg) {
|
||||
private TransportApiResponseMsg handle(GetEntityProfileRequestMsg requestMsg) {
|
||||
EntityType entityType = EntityType.valueOf(requestMsg.getEntityType());
|
||||
UUID entityUuid = new UUID(requestMsg.getEntityIdMSB(), requestMsg.getEntityIdLSB());
|
||||
GetEntityProfileResponseMsg.Builder builder = GetEntityProfileResponseMsg.newBuilder();
|
||||
@ -470,10 +464,10 @@ public class DefaultTransportApiService implements TransportApiService {
|
||||
} else {
|
||||
throw new RuntimeException("Invalid entity profile request: " + entityType);
|
||||
}
|
||||
return Futures.immediateFuture(TransportApiResponseMsg.newBuilder().setEntityProfileResponseMsg(builder).build());
|
||||
return TransportApiResponseMsg.newBuilder().setEntityProfileResponseMsg(builder).build();
|
||||
}
|
||||
|
||||
private ListenableFuture<TransportApiResponseMsg> handle(GetDeviceRequestMsg requestMsg) {
|
||||
private TransportApiResponseMsg handle(GetDeviceRequestMsg requestMsg) {
|
||||
DeviceId deviceId = new DeviceId(new UUID(requestMsg.getDeviceIdMSB(), requestMsg.getDeviceIdLSB()));
|
||||
Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId);
|
||||
|
||||
@ -491,21 +485,20 @@ public class DefaultTransportApiService implements TransportApiService {
|
||||
} else {
|
||||
responseMsg = TransportApiResponseMsg.getDefaultInstance();
|
||||
}
|
||||
|
||||
return Futures.immediateFuture(responseMsg);
|
||||
return responseMsg;
|
||||
}
|
||||
|
||||
private ListenableFuture<TransportApiResponseMsg> handle(GetDeviceCredentialsRequestMsg requestMsg) {
|
||||
private TransportApiResponseMsg handle(GetDeviceCredentialsRequestMsg requestMsg) {
|
||||
DeviceId deviceId = new DeviceId(new UUID(requestMsg.getDeviceIdMSB(), requestMsg.getDeviceIdLSB()));
|
||||
DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(TenantId.SYS_TENANT_ID, deviceId);
|
||||
|
||||
return Futures.immediateFuture(TransportApiResponseMsg.newBuilder()
|
||||
return TransportApiResponseMsg.newBuilder()
|
||||
.setDeviceCredentialsResponseMsg(TransportProtos.GetDeviceCredentialsResponseMsg.newBuilder()
|
||||
.setDeviceCredentialsData(ProtoUtils.toProto(deviceCredentials)))
|
||||
.build());
|
||||
.build();
|
||||
}
|
||||
|
||||
private ListenableFuture<TransportApiResponseMsg> handle(GetResourceRequestMsg requestMsg) {
|
||||
private TransportApiResponseMsg handle(GetResourceRequestMsg requestMsg) {
|
||||
TenantId tenantId = TenantId.fromUUID(new UUID(requestMsg.getTenantIdMSB(), requestMsg.getTenantIdLSB()));
|
||||
ResourceType resourceType = ResourceType.valueOf(requestMsg.getResourceType());
|
||||
String resourceKey = requestMsg.getResourceKey();
|
||||
@ -520,10 +513,10 @@ public class DefaultTransportApiService implements TransportApiService {
|
||||
builder.setResource(ProtoUtils.toProto(resource));
|
||||
}
|
||||
|
||||
return Futures.immediateFuture(TransportApiResponseMsg.newBuilder().setResourceResponseMsg(builder).build());
|
||||
return TransportApiResponseMsg.newBuilder().setResourceResponseMsg(builder).build();
|
||||
}
|
||||
|
||||
private ListenableFuture<TransportApiResponseMsg> handle(GetSnmpDevicesRequestMsg requestMsg) {
|
||||
private TransportApiResponseMsg handle(GetSnmpDevicesRequestMsg requestMsg) {
|
||||
PageLink pageLink = new PageLink(requestMsg.getPageSize(), requestMsg.getPage());
|
||||
PageData<UUID> result = deviceService.findDevicesIdsByDeviceProfileTransportType(DeviceTransportType.SNMP, pageLink);
|
||||
|
||||
@ -534,9 +527,9 @@ public class DefaultTransportApiService implements TransportApiService {
|
||||
.setHasNextPage(result.hasNext())
|
||||
.build();
|
||||
|
||||
return Futures.immediateFuture(TransportApiResponseMsg.newBuilder()
|
||||
return TransportApiResponseMsg.newBuilder()
|
||||
.setSnmpDevicesResponseMsg(responseMsg)
|
||||
.build());
|
||||
.build();
|
||||
}
|
||||
|
||||
TransportApiResponseMsg getDeviceInfo(DeviceCredentials credentials) {
|
||||
@ -565,31 +558,26 @@ public class DefaultTransportApiService implements TransportApiService {
|
||||
}
|
||||
}
|
||||
|
||||
private ListenableFuture<TransportApiResponseMsg> getEmptyTransportApiResponseFuture() {
|
||||
return Futures.immediateFuture(getEmptyTransportApiResponse());
|
||||
}
|
||||
|
||||
private TransportApiResponseMsg getEmptyTransportApiResponse() {
|
||||
return TransportApiResponseMsg.newBuilder()
|
||||
.setValidateCredResponseMsg(ValidateDeviceCredentialsResponseMsg.getDefaultInstance()).build();
|
||||
}
|
||||
|
||||
private ListenableFuture<TransportApiResponseMsg> handle(TransportProtos.LwM2MRequestMsg requestMsg) {
|
||||
private TransportApiResponseMsg handle(TransportProtos.LwM2MRequestMsg requestMsg) {
|
||||
if (requestMsg.hasRegistrationMsg()) {
|
||||
return handleRegistration(requestMsg.getRegistrationMsg());
|
||||
} else {
|
||||
return Futures.immediateFailedFuture(new RuntimeException("Not supported!"));
|
||||
throw new RuntimeException("Not supported!");
|
||||
}
|
||||
}
|
||||
|
||||
private ListenableFuture<TransportApiResponseMsg> handle(TransportProtos.GetOtaPackageRequestMsg requestMsg) {
|
||||
private TransportApiResponseMsg handle(TransportProtos.GetOtaPackageRequestMsg requestMsg) {
|
||||
TenantId tenantId = TenantId.fromUUID(new UUID(requestMsg.getTenantIdMSB(), requestMsg.getTenantIdLSB()));
|
||||
DeviceId deviceId = new DeviceId(new UUID(requestMsg.getDeviceIdMSB(), requestMsg.getDeviceIdLSB()));
|
||||
OtaPackageType otaPackageType = OtaPackageType.valueOf(requestMsg.getType());
|
||||
Device device = deviceService.findDeviceById(tenantId, deviceId);
|
||||
|
||||
if (device == null) {
|
||||
return getEmptyTransportApiResponseFuture();
|
||||
return getEmptyTransportApiResponse();
|
||||
}
|
||||
|
||||
OtaPackageId otaPackageId = OtaPackageUtil.getOtaPackageId(device, otaPackageType);
|
||||
@ -626,14 +614,12 @@ public class DefaultTransportApiService implements TransportApiService {
|
||||
}
|
||||
}
|
||||
|
||||
return Futures.immediateFuture(
|
||||
TransportApiResponseMsg.newBuilder()
|
||||
.setOtaPackageResponseMsg(builder.build())
|
||||
.build());
|
||||
return TransportApiResponseMsg.newBuilder()
|
||||
.setOtaPackageResponseMsg(builder.build())
|
||||
.build();
|
||||
}
|
||||
|
||||
private ListenableFuture<TransportApiResponseMsg> handleRegistration
|
||||
(TransportProtos.LwM2MRegistrationRequestMsg msg) {
|
||||
private TransportApiResponseMsg handleRegistration(TransportProtos.LwM2MRegistrationRequestMsg msg) {
|
||||
TenantId tenantId = TenantId.fromUUID(UUID.fromString(msg.getTenantId()));
|
||||
String deviceName = msg.getEndpoint();
|
||||
Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(deviceName, id -> new ReentrantLock());
|
||||
@ -651,21 +637,18 @@ public class DefaultTransportApiService implements TransportApiService {
|
||||
TransportProtos.LwM2MRegistrationResponseMsg.newBuilder()
|
||||
.setDeviceInfo(ProtoUtils.toDeviceInfoProto(device)).build();
|
||||
TransportProtos.LwM2MResponseMsg responseMsg = TransportProtos.LwM2MResponseMsg.newBuilder().setRegistrationMsg(registrationResponseMsg).build();
|
||||
return Futures.immediateFuture(TransportApiResponseMsg.newBuilder().setLwM2MResponseMsg(responseMsg).build());
|
||||
return TransportApiResponseMsg.newBuilder().setLwM2MResponseMsg(responseMsg).build();
|
||||
} catch (JsonProcessingException e) {
|
||||
log.warn("[{}][{}] Failed to lookup device by gateway id and name", tenantId, deviceName, e);
|
||||
log.warn("[{}][{}] Failed to lookup device by name", tenantId, deviceName, e);
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
deviceCreationLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private ListenableFuture<TransportApiResponseMsg> handle(TransportProtos.GetAllQueueRoutingInfoRequestMsg requestMsg) {
|
||||
return queuesToTransportApiResponseMsg(queueService.findAllQueues());
|
||||
}
|
||||
|
||||
private ListenableFuture<TransportApiResponseMsg> queuesToTransportApiResponseMsg(List<Queue> queues) {
|
||||
return Futures.immediateFuture(TransportApiResponseMsg.newBuilder()
|
||||
private TransportApiResponseMsg handle(TransportProtos.GetAllQueueRoutingInfoRequestMsg requestMsg) {
|
||||
List<Queue> queues = queueService.findAllQueues();
|
||||
return TransportApiResponseMsg.newBuilder()
|
||||
.addAllGetQueueRoutingInfoResponseMsgs(queues.stream()
|
||||
.map(queue -> TransportProtos.GetQueueRoutingInfoResponseMsg.newBuilder()
|
||||
.setTenantIdMSB(queue.getTenantId().getId().getMostSignificantBits())
|
||||
@ -676,7 +659,7 @@ public class DefaultTransportApiService implements TransportApiService {
|
||||
.setQueueTopic(queue.getTopic())
|
||||
.setPartitions(queue.getPartitions())
|
||||
.setDuplicateMsgToAllPartitions(queue.isDuplicateMsgToAllPartitions())
|
||||
.build()).collect(Collectors.toList())).build());
|
||||
.build()).collect(Collectors.toList())).build();
|
||||
}
|
||||
|
||||
private ProvisionRequest createProvisionRequest(String certificateValue) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user