diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index f5bd42e29d..8b29af34ed 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -18,7 +18,6 @@ package org.thingsboard.server.service.transport; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -101,13 +100,11 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509Ce import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; -import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.List; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; @@ -136,7 +133,6 @@ public class DefaultTransportApiService implements TransportApiService { private final DeviceProfileService deviceProfileService; private final RelationService relationService; private final DeviceCredentialsService deviceCredentialsService; - private final DbCallbackExecutorService dbCallbackExecutorService; private final TbClusterService tbClusterService; private final DeviceProvisionService deviceProvisionService; private final ResourceService resourceService; @@ -170,52 +166,54 @@ public class DefaultTransportApiService implements TransportApiService { @Override public ListenableFuture> handle(TbProtoQueueMsg tbProtoQueueMsg) { TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue(); - ListenableFuture result = null; + return handlerExecutor.submit(() -> { + TransportApiResponseMsg result = handle(transportApiRequestMsg); + return new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), result, tbProtoQueueMsg.getHeaders()); + }); + } + private TransportApiResponseMsg handle(TransportApiRequestMsg transportApiRequestMsg) { if (transportApiRequestMsg.hasValidateTokenRequestMsg()) { ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg(); final String token = msg.getToken(); - result = handlerExecutor.submit(() -> validateCredentials(token, DeviceCredentialsType.ACCESS_TOKEN)); + return validateCredentials(token, DeviceCredentialsType.ACCESS_TOKEN); } else if (transportApiRequestMsg.hasValidateBasicMqttCredRequestMsg()) { TransportProtos.ValidateBasicMqttCredRequestMsg msg = transportApiRequestMsg.getValidateBasicMqttCredRequestMsg(); - result = handlerExecutor.submit(() -> validateCredentials(msg)); + return validateCredentials(msg); } else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) { ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg(); final String hash = msg.getHash(); - result = handlerExecutor.submit(() -> validateCredentials(hash, DeviceCredentialsType.X509_CERTIFICATE)); + return validateCredentials(hash, DeviceCredentialsType.X509_CERTIFICATE); } else if (transportApiRequestMsg.hasValidateOrCreateX509CertRequestMsg()) { TransportProtos.ValidateOrCreateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateOrCreateX509CertRequestMsg(); final String certChain = msg.getCertificateChain(); - result = handlerExecutor.submit(() -> validateOrCreateDeviceX509Certificate(certChain)); + return validateOrCreateDeviceX509Certificate(certChain); } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) { - result = handlerExecutor.submit(() -> handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg())); + return handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()); } else if (transportApiRequestMsg.hasEntityProfileRequestMsg()) { - result = handle(transportApiRequestMsg.getEntityProfileRequestMsg()); + return handle(transportApiRequestMsg.getEntityProfileRequestMsg()); } else if (transportApiRequestMsg.hasLwM2MRequestMsg()) { - result = handle(transportApiRequestMsg.getLwM2MRequestMsg()); + return handle(transportApiRequestMsg.getLwM2MRequestMsg()); } else if (transportApiRequestMsg.hasValidateDeviceLwM2MCredentialsRequestMsg()) { ValidateDeviceLwM2MCredentialsRequestMsg msg = transportApiRequestMsg.getValidateDeviceLwM2MCredentialsRequestMsg(); final String credentialsId = msg.getCredentialsId(); - result = handlerExecutor.submit(() -> validateCredentials(credentialsId, DeviceCredentialsType.LWM2M_CREDENTIALS)); + return validateCredentials(credentialsId, DeviceCredentialsType.LWM2M_CREDENTIALS); } else if (transportApiRequestMsg.hasProvisionDeviceRequestMsg()) { - result = handle(transportApiRequestMsg.getProvisionDeviceRequestMsg()); + return handle(transportApiRequestMsg.getProvisionDeviceRequestMsg()); } else if (transportApiRequestMsg.hasResourceRequestMsg()) { - result = handle(transportApiRequestMsg.getResourceRequestMsg()); + return handle(transportApiRequestMsg.getResourceRequestMsg()); } else if (transportApiRequestMsg.hasSnmpDevicesRequestMsg()) { - result = handle(transportApiRequestMsg.getSnmpDevicesRequestMsg()); + return handle(transportApiRequestMsg.getSnmpDevicesRequestMsg()); } else if (transportApiRequestMsg.hasDeviceRequestMsg()) { - result = handle(transportApiRequestMsg.getDeviceRequestMsg()); + return handle(transportApiRequestMsg.getDeviceRequestMsg()); } else if (transportApiRequestMsg.hasDeviceCredentialsRequestMsg()) { - result = handle(transportApiRequestMsg.getDeviceCredentialsRequestMsg()); + return handle(transportApiRequestMsg.getDeviceCredentialsRequestMsg()); } else if (transportApiRequestMsg.hasOtaPackageRequestMsg()) { - result = handle(transportApiRequestMsg.getOtaPackageRequestMsg()); + return handle(transportApiRequestMsg.getOtaPackageRequestMsg()); } else if (transportApiRequestMsg.hasGetAllQueueRoutingInfoRequestMsg()) { - return Futures.transform(handle(transportApiRequestMsg.getGetAllQueueRoutingInfoRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); + return handle(transportApiRequestMsg.getGetAllQueueRoutingInfoRequestMsg()); } - - return Futures.transform(Optional.ofNullable(result).orElseGet(this::getEmptyTransportApiResponseFuture), - value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), - MoreExecutors.directExecutor()); + return getEmptyTransportApiResponse(); } private TransportApiResponseMsg validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) { @@ -405,10 +403,10 @@ public class DefaultTransportApiService implements TransportApiService { } } - private ListenableFuture handle(ProvisionDeviceRequestMsg requestMsg) { - ListenableFuture provisionResponseFuture; + private TransportApiResponseMsg handle(ProvisionDeviceRequestMsg requestMsg) { + ProvisionResponse provisionResponse; try { - provisionResponseFuture = Futures.immediateFuture(deviceProvisionService.provisionDevice( + provisionResponse = deviceProvisionService.provisionDevice( new ProvisionRequest( requestMsg.getDeviceName(), requestMsg.getCredentialsType() != null ? DeviceCredentialsType.valueOf(requestMsg.getCredentialsType().name()) : null, @@ -419,18 +417,14 @@ public class DefaultTransportApiService implements TransportApiService { requestMsg.getCredentialsDataProto().getValidateDeviceX509CertRequestMsg().getHash()), new ProvisionDeviceProfileCredentials( requestMsg.getProvisionDeviceCredentialsMsg().getProvisionDeviceKey(), - requestMsg.getProvisionDeviceCredentialsMsg().getProvisionDeviceSecret())))); + requestMsg.getProvisionDeviceCredentialsMsg().getProvisionDeviceSecret()))); } catch (ProvisionFailedException e) { - return Futures.immediateFuture(getTransportApiResponseMsg( - new DeviceCredentials(), - TransportProtos.ResponseStatus.valueOf(e.getMessage()))); + return getTransportApiResponseMsg(new DeviceCredentials(), TransportProtos.ResponseStatus.valueOf(e.getMessage())); } - return Futures.transform(provisionResponseFuture, provisionResponse -> getTransportApiResponseMsg(provisionResponse.getDeviceCredentials(), TransportProtos.ResponseStatus.SUCCESS), - dbCallbackExecutorService); + return getTransportApiResponseMsg(provisionResponse.getDeviceCredentials(), TransportProtos.ResponseStatus.SUCCESS); } - private TransportApiResponseMsg getTransportApiResponseMsg(DeviceCredentials - deviceCredentials, TransportProtos.ResponseStatus status) { + private TransportApiResponseMsg getTransportApiResponseMsg(DeviceCredentials deviceCredentials, TransportProtos.ResponseStatus status) { if (!status.equals(TransportProtos.ResponseStatus.SUCCESS)) { return TransportApiResponseMsg.newBuilder().setProvisionDeviceResponseMsg(TransportProtos.ProvisionDeviceResponseMsg.newBuilder().setStatus(status).build()).build(); } @@ -453,7 +447,7 @@ public class DefaultTransportApiService implements TransportApiService { .build(); } - private ListenableFuture handle(GetEntityProfileRequestMsg requestMsg) { + private TransportApiResponseMsg handle(GetEntityProfileRequestMsg requestMsg) { EntityType entityType = EntityType.valueOf(requestMsg.getEntityType()); UUID entityUuid = new UUID(requestMsg.getEntityIdMSB(), requestMsg.getEntityIdLSB()); GetEntityProfileResponseMsg.Builder builder = GetEntityProfileResponseMsg.newBuilder(); @@ -470,10 +464,10 @@ public class DefaultTransportApiService implements TransportApiService { } else { throw new RuntimeException("Invalid entity profile request: " + entityType); } - return Futures.immediateFuture(TransportApiResponseMsg.newBuilder().setEntityProfileResponseMsg(builder).build()); + return TransportApiResponseMsg.newBuilder().setEntityProfileResponseMsg(builder).build(); } - private ListenableFuture handle(GetDeviceRequestMsg requestMsg) { + private TransportApiResponseMsg handle(GetDeviceRequestMsg requestMsg) { DeviceId deviceId = new DeviceId(new UUID(requestMsg.getDeviceIdMSB(), requestMsg.getDeviceIdLSB())); Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId); @@ -491,21 +485,20 @@ public class DefaultTransportApiService implements TransportApiService { } else { responseMsg = TransportApiResponseMsg.getDefaultInstance(); } - - return Futures.immediateFuture(responseMsg); + return responseMsg; } - private ListenableFuture handle(GetDeviceCredentialsRequestMsg requestMsg) { + private TransportApiResponseMsg handle(GetDeviceCredentialsRequestMsg requestMsg) { DeviceId deviceId = new DeviceId(new UUID(requestMsg.getDeviceIdMSB(), requestMsg.getDeviceIdLSB())); DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(TenantId.SYS_TENANT_ID, deviceId); - return Futures.immediateFuture(TransportApiResponseMsg.newBuilder() + return TransportApiResponseMsg.newBuilder() .setDeviceCredentialsResponseMsg(TransportProtos.GetDeviceCredentialsResponseMsg.newBuilder() .setDeviceCredentialsData(ProtoUtils.toProto(deviceCredentials))) - .build()); + .build(); } - private ListenableFuture handle(GetResourceRequestMsg requestMsg) { + private TransportApiResponseMsg handle(GetResourceRequestMsg requestMsg) { TenantId tenantId = TenantId.fromUUID(new UUID(requestMsg.getTenantIdMSB(), requestMsg.getTenantIdLSB())); ResourceType resourceType = ResourceType.valueOf(requestMsg.getResourceType()); String resourceKey = requestMsg.getResourceKey(); @@ -520,10 +513,10 @@ public class DefaultTransportApiService implements TransportApiService { builder.setResource(ProtoUtils.toProto(resource)); } - return Futures.immediateFuture(TransportApiResponseMsg.newBuilder().setResourceResponseMsg(builder).build()); + return TransportApiResponseMsg.newBuilder().setResourceResponseMsg(builder).build(); } - private ListenableFuture handle(GetSnmpDevicesRequestMsg requestMsg) { + private TransportApiResponseMsg handle(GetSnmpDevicesRequestMsg requestMsg) { PageLink pageLink = new PageLink(requestMsg.getPageSize(), requestMsg.getPage()); PageData result = deviceService.findDevicesIdsByDeviceProfileTransportType(DeviceTransportType.SNMP, pageLink); @@ -534,9 +527,9 @@ public class DefaultTransportApiService implements TransportApiService { .setHasNextPage(result.hasNext()) .build(); - return Futures.immediateFuture(TransportApiResponseMsg.newBuilder() + return TransportApiResponseMsg.newBuilder() .setSnmpDevicesResponseMsg(responseMsg) - .build()); + .build(); } TransportApiResponseMsg getDeviceInfo(DeviceCredentials credentials) { @@ -565,31 +558,26 @@ public class DefaultTransportApiService implements TransportApiService { } } - private ListenableFuture getEmptyTransportApiResponseFuture() { - return Futures.immediateFuture(getEmptyTransportApiResponse()); - } - private TransportApiResponseMsg getEmptyTransportApiResponse() { return TransportApiResponseMsg.newBuilder() .setValidateCredResponseMsg(ValidateDeviceCredentialsResponseMsg.getDefaultInstance()).build(); } - private ListenableFuture handle(TransportProtos.LwM2MRequestMsg requestMsg) { + private TransportApiResponseMsg handle(TransportProtos.LwM2MRequestMsg requestMsg) { if (requestMsg.hasRegistrationMsg()) { return handleRegistration(requestMsg.getRegistrationMsg()); } else { - return Futures.immediateFailedFuture(new RuntimeException("Not supported!")); + throw new RuntimeException("Not supported!"); } } - private ListenableFuture handle(TransportProtos.GetOtaPackageRequestMsg requestMsg) { + private TransportApiResponseMsg handle(TransportProtos.GetOtaPackageRequestMsg requestMsg) { TenantId tenantId = TenantId.fromUUID(new UUID(requestMsg.getTenantIdMSB(), requestMsg.getTenantIdLSB())); DeviceId deviceId = new DeviceId(new UUID(requestMsg.getDeviceIdMSB(), requestMsg.getDeviceIdLSB())); OtaPackageType otaPackageType = OtaPackageType.valueOf(requestMsg.getType()); Device device = deviceService.findDeviceById(tenantId, deviceId); - if (device == null) { - return getEmptyTransportApiResponseFuture(); + return getEmptyTransportApiResponse(); } OtaPackageId otaPackageId = OtaPackageUtil.getOtaPackageId(device, otaPackageType); @@ -626,14 +614,12 @@ public class DefaultTransportApiService implements TransportApiService { } } - return Futures.immediateFuture( - TransportApiResponseMsg.newBuilder() - .setOtaPackageResponseMsg(builder.build()) - .build()); + return TransportApiResponseMsg.newBuilder() + .setOtaPackageResponseMsg(builder.build()) + .build(); } - private ListenableFuture handleRegistration - (TransportProtos.LwM2MRegistrationRequestMsg msg) { + private TransportApiResponseMsg handleRegistration(TransportProtos.LwM2MRegistrationRequestMsg msg) { TenantId tenantId = TenantId.fromUUID(UUID.fromString(msg.getTenantId())); String deviceName = msg.getEndpoint(); Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(deviceName, id -> new ReentrantLock()); @@ -651,21 +637,18 @@ public class DefaultTransportApiService implements TransportApiService { TransportProtos.LwM2MRegistrationResponseMsg.newBuilder() .setDeviceInfo(ProtoUtils.toDeviceInfoProto(device)).build(); TransportProtos.LwM2MResponseMsg responseMsg = TransportProtos.LwM2MResponseMsg.newBuilder().setRegistrationMsg(registrationResponseMsg).build(); - return Futures.immediateFuture(TransportApiResponseMsg.newBuilder().setLwM2MResponseMsg(responseMsg).build()); + return TransportApiResponseMsg.newBuilder().setLwM2MResponseMsg(responseMsg).build(); } catch (JsonProcessingException e) { - log.warn("[{}][{}] Failed to lookup device by gateway id and name", tenantId, deviceName, e); + log.warn("[{}][{}] Failed to lookup device by name", tenantId, deviceName, e); throw new RuntimeException(e); } finally { deviceCreationLock.unlock(); } } - private ListenableFuture handle(TransportProtos.GetAllQueueRoutingInfoRequestMsg requestMsg) { - return queuesToTransportApiResponseMsg(queueService.findAllQueues()); - } - - private ListenableFuture queuesToTransportApiResponseMsg(List queues) { - return Futures.immediateFuture(TransportApiResponseMsg.newBuilder() + private TransportApiResponseMsg handle(TransportProtos.GetAllQueueRoutingInfoRequestMsg requestMsg) { + List queues = queueService.findAllQueues(); + return TransportApiResponseMsg.newBuilder() .addAllGetQueueRoutingInfoResponseMsgs(queues.stream() .map(queue -> TransportProtos.GetQueueRoutingInfoResponseMsg.newBuilder() .setTenantIdMSB(queue.getTenantId().getId().getMostSignificantBits()) @@ -676,7 +659,7 @@ public class DefaultTransportApiService implements TransportApiService { .setQueueTopic(queue.getTopic()) .setPartitions(queue.getPartitions()) .setDuplicateMsgToAllPartitions(queue.isDuplicateMsgToAllPartitions()) - .build()).collect(Collectors.toList())).build()); + .build()).collect(Collectors.toList())).build(); } private ProvisionRequest createProvisionRequest(String certificateValue) {