diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index 89a7480708..0c29b19fd9 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -20,12 +20,16 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import org.springframework.util.ConcurrentReferenceHashMap; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.cache.ota.OtaPackageDataCache; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageState; @@ -104,10 +108,11 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.resource.TbResourceService; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -144,12 +149,28 @@ public class DefaultTransportApiService implements TransportApiService { private final OtaPackageDataCache otaPackageDataCache; private final QueueService queueService; - private final ConcurrentMap deviceCreationLocks = new ConcurrentHashMap<>(); + private final ConcurrentMap deviceCreationLocks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); + + @Value("${queue.transport_api.max_core_handler_threads:16}") + private int maxCoreHandlerThreads; + + ListeningExecutorService handlerExecutor; private static boolean checkIsMqttCredentials(DeviceCredentials credentials) { return credentials != null && DeviceCredentialsType.MQTT_BASIC.equals(credentials.getCredentialsType()); } + @PostConstruct + public void init() { + handlerExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(maxCoreHandlerThreads, "transport-api-service-core-handler")); + } + @PreDestroy + public void destroy() { + if (handlerExecutor != null) { + handlerExecutor.shutdownNow(); + } + } + @Override public ListenableFuture> handle(TbProtoQueueMsg tbProtoQueueMsg) { TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue(); @@ -157,16 +178,19 @@ public class DefaultTransportApiService implements TransportApiService { if (transportApiRequestMsg.hasValidateTokenRequestMsg()) { ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg(); - result = validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN); + final String token = msg.getToken(); + result = handlerExecutor.submit(() -> validateCredentials(token, DeviceCredentialsType.ACCESS_TOKEN)); } else if (transportApiRequestMsg.hasValidateBasicMqttCredRequestMsg()) { TransportProtos.ValidateBasicMqttCredRequestMsg msg = transportApiRequestMsg.getValidateBasicMqttCredRequestMsg(); - result = validateCredentials(msg); + result = handlerExecutor.submit(() -> validateCredentials(msg)); } else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) { ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg(); - result = validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE); + final String hash = msg.getHash(); + result = handlerExecutor.submit(() -> validateCredentials(hash, DeviceCredentialsType.X509_CERTIFICATE)); } else if (transportApiRequestMsg.hasValidateOrCreateX509CertRequestMsg()) { TransportProtos.ValidateOrCreateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateOrCreateX509CertRequestMsg(); - result = validateOrCreateDeviceX509Certificate(msg.getCertificateChain()); + final String certChain = msg.getCertificateChain(); + result = handlerExecutor.submit(() -> validateOrCreateDeviceX509Certificate(certChain)); } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) { result = handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()); } else if (transportApiRequestMsg.hasEntityProfileRequestMsg()) { @@ -175,7 +199,8 @@ public class DefaultTransportApiService implements TransportApiService { result = handle(transportApiRequestMsg.getLwM2MRequestMsg()); } else if (transportApiRequestMsg.hasValidateDeviceLwM2MCredentialsRequestMsg()) { ValidateDeviceLwM2MCredentialsRequestMsg msg = transportApiRequestMsg.getValidateDeviceLwM2MCredentialsRequestMsg(); - result = validateCredentials(msg.getCredentialsId(), DeviceCredentialsType.LWM2M_CREDENTIALS); + final String credentialsId = msg.getCredentialsId(); + result = handlerExecutor.submit(() -> validateCredentials(credentialsId, DeviceCredentialsType.LWM2M_CREDENTIALS)); } else if (transportApiRequestMsg.hasProvisionDeviceRequestMsg()) { result = handle(transportApiRequestMsg.getProvisionDeviceRequestMsg()); } else if (transportApiRequestMsg.hasResourceRequestMsg()) { @@ -197,24 +222,24 @@ public class DefaultTransportApiService implements TransportApiService { MoreExecutors.directExecutor()); } - private ListenableFuture validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) { + private TransportApiResponseMsg validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) { //TODO: Make async and enable caching DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId); if (credentials != null && credentials.getCredentialsType() == credentialsType) { return getDeviceInfo(credentials); } else { - return getEmptyTransportApiResponseFuture(); + return getEmptyTransportApiResponse(); } } - private ListenableFuture validateCredentials(TransportProtos.ValidateBasicMqttCredRequestMsg mqtt) { + private TransportApiResponseMsg validateCredentials(TransportProtos.ValidateBasicMqttCredRequestMsg mqtt) { DeviceCredentials credentials; if (StringUtils.isEmpty(mqtt.getUserName())) { credentials = checkMqttCredentials(mqtt, EncryptionUtil.getSha3Hash(mqtt.getClientId())); if (credentials != null) { return getDeviceInfo(credentials); } else { - return getEmptyTransportApiResponseFuture(); + return getEmptyTransportApiResponse(); } } else { credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId( @@ -224,7 +249,7 @@ public class DefaultTransportApiService implements TransportApiService { if (VALID.equals(validationResult)) { return getDeviceInfo(credentials); } else if (PASSWORD_MISMATCH.equals(validationResult)) { - return getEmptyTransportApiResponseFuture(); + return getEmptyTransportApiResponse(); } else { return validateUserNameCredentials(mqtt); } @@ -234,7 +259,7 @@ public class DefaultTransportApiService implements TransportApiService { } } - protected ListenableFuture validateOrCreateDeviceX509Certificate(String certificateChain) { + protected TransportApiResponseMsg validateOrCreateDeviceX509Certificate(String certificateChain) { List chain = X509_CERTIFICATE_TRIM_CHAIN_PATTERN.matcher(certificateChain).results().map(match -> EncryptionUtil.certTrimNewLines(match.group())).collect(Collectors.toList()); for (String certificateValue : chain) { @@ -254,16 +279,16 @@ public class DefaultTransportApiService implements TransportApiService { } } catch (ProvisionFailedException e) { log.debug("[{}][{}] Failed to provision device with cert chain: {}", deviceProfile.getTenantId(), deviceProfile.getId(), provisionRequest, e); - return getEmptyTransportApiResponseFuture(); + return getEmptyTransportApiResponse(); } } else if (deviceProfile != null) { log.warn("[{}][{}] Device Profile provision configuration mismatched: expected {}, actual {}", deviceProfile.getTenantId(), deviceProfile.getId(), DeviceProfileProvisionType.X509_CERTIFICATE_CHAIN, deviceProfile.getProvisionType()); } } - return getEmptyTransportApiResponseFuture(); + return getEmptyTransportApiResponse(); } - private ListenableFuture validateUserNameCredentials(TransportProtos.ValidateBasicMqttCredRequestMsg mqtt) { + private TransportApiResponseMsg validateUserNameCredentials(TransportProtos.ValidateBasicMqttCredRequestMsg mqtt) { DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(mqtt.getUserName()); if (credentials != null) { switch (credentials.getCredentialsType()) { @@ -273,11 +298,11 @@ public class DefaultTransportApiService implements TransportApiService { if (VALID.equals(validateMqttCredentials(mqtt, credentials))) { return getDeviceInfo(credentials); } else { - return getEmptyTransportApiResponseFuture(); + return getEmptyTransportApiResponse(); } } } - return getEmptyTransportApiResponseFuture(); + return getEmptyTransportApiResponse(); } private DeviceCredentials checkMqttCredentials(TransportProtos.ValidateBasicMqttCredRequestMsg clientCred, String credId) { @@ -518,31 +543,30 @@ public class DefaultTransportApiService implements TransportApiService { .build()); } - private ListenableFuture getDeviceInfo(DeviceCredentials credentials) { - return Futures.transform(deviceService.findDeviceByIdAsync(TenantId.SYS_TENANT_ID, credentials.getDeviceId()), device -> { - if (device == null) { - log.trace("[{}] Failed to lookup device by id", credentials.getDeviceId()); - return getEmptyTransportApiResponse(); + TransportApiResponseMsg getDeviceInfo(DeviceCredentials credentials) { + Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, credentials.getDeviceId()); + if (device == null) { + log.trace("[{}] Failed to lookup device by id", credentials.getDeviceId()); + return getEmptyTransportApiResponse(); + } + try { + ValidateDeviceCredentialsResponseMsg.Builder builder = ValidateDeviceCredentialsResponseMsg.newBuilder(); + builder.setDeviceInfo(getDeviceInfoProto(device)); + DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId()); + if (deviceProfile != null) { + builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))); + } else { + log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId()); } - try { - ValidateDeviceCredentialsResponseMsg.Builder builder = ValidateDeviceCredentialsResponseMsg.newBuilder(); - builder.setDeviceInfo(getDeviceInfoProto(device)); - DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId()); - if (deviceProfile != null) { - builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))); - } else { - log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId()); - } - if (!StringUtils.isEmpty(credentials.getCredentialsValue())) { - builder.setCredentialsBody(credentials.getCredentialsValue()); - } - return TransportApiResponseMsg.newBuilder() - .setValidateCredResponseMsg(builder.build()).build(); - } catch (JsonProcessingException e) { - log.warn("[{}] Failed to lookup device by id", credentials.getDeviceId(), e); - return getEmptyTransportApiResponse(); + if (!StringUtils.isEmpty(credentials.getCredentialsValue())) { + builder.setCredentialsBody(credentials.getCredentialsValue()); } - }, MoreExecutors.directExecutor()); + return TransportApiResponseMsg.newBuilder() + .setValidateCredResponseMsg(builder.build()).build(); + } catch (JsonProcessingException e) { + log.warn("[{}] Failed to lookup device by id", credentials.getDeviceId(), e); + return getEmptyTransportApiResponse(); + } } private DeviceInfoProto getDeviceInfoProto(Device device) throws JsonProcessingException { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index cf81e28f39..999c3c0fff 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1491,6 +1491,8 @@ queue: max_requests_timeout: "${TB_QUEUE_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}" # Amount of threads used to invoke callbacks max_callback_threads: "${TB_QUEUE_TRANSPORT_MAX_CALLBACK_THREADS:100}" + # Amount of threads used for transport API requests + max_core_handler_threads: "${TB_QUEUE_TRANSPORT_MAX_CORE_HANDLER_THREADS:16}" # Interval in milliseconds to poll api requests from transport microservices request_poll_interval: "${TB_QUEUE_TRANSPORT_REQUEST_POLL_INTERVAL_MS:25}" # Interval in milliseconds to poll api response from transport microservices diff --git a/application/src/test/java/org/thingsboard/server/service/transport/DefaultTransportApiServiceTest.java b/application/src/test/java/org/thingsboard/server/service/transport/DefaultTransportApiServiceTest.java index 9687225bd1..33c11ea91d 100644 --- a/application/src/test/java/org/thingsboard/server/service/transport/DefaultTransportApiServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/transport/DefaultTransportApiServiceTest.java @@ -16,7 +16,6 @@ package org.thingsboard.server.service.transport; -import com.google.common.util.concurrent.Futures; import lombok.extern.slf4j.Slf4j; import org.junit.Before; import org.junit.Test; @@ -46,6 +45,7 @@ import org.thingsboard.server.dao.ota.OtaPackageService; import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; @@ -62,6 +62,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -123,11 +125,13 @@ public class DefaultTransportApiServiceTest { @Test public void validateExistingDeviceByX509CertificateStrategy() { var device = createDevice(); - when(deviceService.findDeviceByIdAsync(any(), any())).thenReturn(Futures.immediateFuture(device)); var deviceCredentials = createDeviceCredentials(chain[0], device.getId()); when(deviceCredentialsService.findDeviceCredentialsByCredentialsId(any())).thenReturn(deviceCredentials); + TransportProtos.TransportApiResponseMsg response = mock(TransportProtos.TransportApiResponseMsg.class); + willReturn(response).given(service).getDeviceInfo(deviceCredentials); + service.validateOrCreateDeviceX509Certificate(certificateChain); verify(deviceCredentialsService, times(1)).findDeviceCredentialsByCredentialsId(any()); } @@ -139,7 +143,6 @@ public class DefaultTransportApiServiceTest { var device = createDevice(); when(deviceService.findDeviceByTenantIdAndName(any(), any())).thenReturn(device); - when(deviceService.findDeviceByIdAsync(any(), any())).thenReturn(Futures.immediateFuture(device)); var deviceCredentials = createDeviceCredentials(chain[0], device.getId()); when(deviceCredentialsService.findDeviceCredentialsByCredentialsId(any())).thenReturn(null); @@ -148,9 +151,12 @@ public class DefaultTransportApiServiceTest { var provisionResponse = createProvisionResponse(deviceCredentials); when(deviceProvisionService.provisionDeviceViaX509Chain(any(), any())).thenReturn(provisionResponse); + TransportProtos.TransportApiResponseMsg response = mock(TransportProtos.TransportApiResponseMsg.class); + willReturn(response).given(service).getDeviceInfo(deviceCredentials); + service.validateOrCreateDeviceX509Certificate(certificateChain); verify(deviceProfileService, times(1)).findDeviceProfileByProvisionDeviceKey(any()); - verify(deviceService, times(1)).findDeviceByIdAsync(any(), any()); + verify(service, times(1)).getDeviceInfo(any()); verify(deviceCredentialsService, times(1)).findDeviceCredentialsByCredentialsId(any()); verify(deviceProvisionService, times(1)).provisionDeviceViaX509Chain(any(), any()); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index 5cd6ba9145..fd0297203d 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -188,7 +188,7 @@ public abstract class AbstractGatewaySessionHandler() { @Override public void onSuccess(@Nullable T result) { ack(msg, ReturnCode.SUCCESS); - log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName); + log.trace("[{}][{}][{}] onDeviceConnectOk: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName); } @Override public void onFailure(Throwable t) { - log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, t); + log.warn("[{}][{}][{}] Failed to process device connect command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t); } }, context.getExecutor()); @@ -259,7 +259,7 @@ public abstract class AbstractGatewaySessionHandler checkDeviceConnected(String deviceName) { T ctx = devices.get(deviceName); if (ctx == null) { - log.debug("[{}] Missing device [{}] for the gateway session", sessionId, deviceName); + log.debug("[{}][{}][{}] Missing device [{}] for the gateway session", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName); return onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE); } else { return Futures.immediateFuture(ctx); @@ -733,7 +733,7 @@ public abstract class AbstractGatewaySessionHandler() { @Override public void onSuccess(Void dummy) { - log.trace("[{}][{}] Published msg: {}", sessionId, deviceName, msg); + log.trace("[{}][{}][{}][{}] Published msg: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, msg); if (msgId > 0) { ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, ReturnCode.SUCCESS)); } @@ -757,7 +757,7 @@ public abstract class AbstractGatewaySessionHandler deviceCredentialsDao.findByCredentialsId(TenantId.SYS_TENANT_ID, credentialsId), - false); + true); // caching null values is essential for permanently invalid requests } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceCredentialsDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceCredentialsDao.java index 2b07b0518e..80ea66d5bd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceCredentialsDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceCredentialsDao.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.dao.sql.device; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Component; @@ -33,6 +34,7 @@ import java.util.UUID; /** * Created by Valerii Sosliuk on 5/6/2017. */ +@Slf4j @Component @SqlDao public class JpaDeviceCredentialsDao extends JpaAbstractDao implements DeviceCredentialsDao { @@ -65,6 +67,7 @@ public class JpaDeviceCredentialsDao extends JpaAbstractDao