From 3b8c6869bde9a814540d856faaff04671060e239 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Mon, 6 Nov 2023 18:06:05 +0200 Subject: [PATCH] Use new thread pool for handling gateway requests --- .../transport/DefaultTransportApiService.java | 131 +++++++++--------- 1 file changed, 64 insertions(+), 67 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index 0c29b19fd9..3505052fca 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -192,7 +192,7 @@ public class DefaultTransportApiService implements TransportApiService { final String certChain = msg.getCertificateChain(); result = handlerExecutor.submit(() -> validateOrCreateDeviceX509Certificate(certChain)); } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) { - result = handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()); + result = handlerExecutor.submit(() -> handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg())); } else if (transportApiRequestMsg.hasEntityProfileRequestMsg()) { result = handle(transportApiRequestMsg.getEntityProfileRequestMsg()); } else if (transportApiRequestMsg.hasLwM2MRequestMsg()) { @@ -223,7 +223,6 @@ public class DefaultTransportApiService implements TransportApiService { } private TransportApiResponseMsg validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) { - //TODO: Make async and enable caching DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId); if (credentials != null && credentials.getCredentialsType() == credentialsType) { return getDeviceInfo(credentials); @@ -336,76 +335,74 @@ public class DefaultTransportApiService implements TransportApiService { return VALID; } - private ListenableFuture handle(GetOrCreateDeviceFromGatewayRequestMsg requestMsg) { + private TransportApiResponseMsg handle(GetOrCreateDeviceFromGatewayRequestMsg requestMsg) { DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB())); - ListenableFuture gatewayFuture = deviceService.findDeviceByIdAsync(TenantId.SYS_TENANT_ID, gatewayId); - return Futures.transform(gatewayFuture, gateway -> { - Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(requestMsg.getDeviceName(), id -> new ReentrantLock()); - deviceCreationLock.lock(); - try { - Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName()); - if (device == null) { - TenantId tenantId = gateway.getTenantId(); - device = new Device(); - device.setTenantId(tenantId); - device.setName(requestMsg.getDeviceName()); - device.setType(requestMsg.getDeviceType()); - device.setCustomerId(gateway.getCustomerId()); - DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType()); + Device gateway = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, gatewayId); + Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(requestMsg.getDeviceName(), id -> new ReentrantLock()); + deviceCreationLock.lock(); + try { + Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName()); + if (device == null) { + TenantId tenantId = gateway.getTenantId(); + device = new Device(); + device.setTenantId(tenantId); + device.setName(requestMsg.getDeviceName()); + device.setType(requestMsg.getDeviceType()); + device.setCustomerId(gateway.getCustomerId()); + DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType()); - device.setDeviceProfileId(deviceProfile.getId()); - ObjectNode additionalInfo = JacksonUtil.newObjectNode(); - additionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString()); - device.setAdditionalInfo(additionalInfo); + device.setDeviceProfileId(deviceProfile.getId()); + ObjectNode additionalInfo = JacksonUtil.newObjectNode(); + additionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString()); + device.setAdditionalInfo(additionalInfo); + Device savedDevice = deviceService.saveDevice(device); + tbClusterService.onDeviceUpdated(savedDevice, null); + device = savedDevice; + + relationService.saveRelation(TenantId.SYS_TENANT_ID, new EntityRelation(gateway.getId(), device.getId(), "Created")); + + TbMsgMetaData metaData = new TbMsgMetaData(); + CustomerId customerId = gateway.getCustomerId(); + if (customerId != null && !customerId.isNullUid()) { + metaData.putValue("customerId", customerId.toString()); + } + metaData.putValue("gatewayId", gatewayId.toString()); + + DeviceId deviceId = device.getId(); + JsonNode entityNode = JacksonUtil.valueToTree(device); + TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, deviceId, customerId, metaData, TbMsgDataType.JSON, JacksonUtil.toString(entityNode)); + tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, null); + } else { + JsonNode deviceAdditionalInfo = device.getAdditionalInfo(); + if (deviceAdditionalInfo == null) { + deviceAdditionalInfo = JacksonUtil.newObjectNode(); + } + if (deviceAdditionalInfo.isObject() && + (!deviceAdditionalInfo.has(DataConstants.LAST_CONNECTED_GATEWAY) + || !gatewayId.toString().equals(deviceAdditionalInfo.get(DataConstants.LAST_CONNECTED_GATEWAY).asText()))) { + ObjectNode newDeviceAdditionalInfo = (ObjectNode) deviceAdditionalInfo; + newDeviceAdditionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString()); Device savedDevice = deviceService.saveDevice(device); - tbClusterService.onDeviceUpdated(savedDevice, null); - device = savedDevice; - - relationService.saveRelation(TenantId.SYS_TENANT_ID, new EntityRelation(gateway.getId(), device.getId(), "Created")); - - TbMsgMetaData metaData = new TbMsgMetaData(); - CustomerId customerId = gateway.getCustomerId(); - if (customerId != null && !customerId.isNullUid()) { - metaData.putValue("customerId", customerId.toString()); - } - metaData.putValue("gatewayId", gatewayId.toString()); - - DeviceId deviceId = device.getId(); - JsonNode entityNode = JacksonUtil.valueToTree(device); - TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, deviceId, customerId, metaData, TbMsgDataType.JSON, JacksonUtil.toString(entityNode)); - tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, null); - } else { - JsonNode deviceAdditionalInfo = device.getAdditionalInfo(); - if (deviceAdditionalInfo == null) { - deviceAdditionalInfo = JacksonUtil.newObjectNode(); - } - if (deviceAdditionalInfo.isObject() && - (!deviceAdditionalInfo.has(DataConstants.LAST_CONNECTED_GATEWAY) - || !gatewayId.toString().equals(deviceAdditionalInfo.get(DataConstants.LAST_CONNECTED_GATEWAY).asText()))) { - ObjectNode newDeviceAdditionalInfo = (ObjectNode) deviceAdditionalInfo; - newDeviceAdditionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString()); - Device savedDevice = deviceService.saveDevice(device); - tbClusterService.onDeviceUpdated(savedDevice, device); - } + tbClusterService.onDeviceUpdated(savedDevice, device); } - GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder() - .setDeviceInfo(getDeviceInfoProto(device)); - DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId()); - if (deviceProfile != null) { - builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))); - } else { - log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId()); - } - return TransportApiResponseMsg.newBuilder() - .setGetOrCreateDeviceResponseMsg(builder.build()) - .build(); - } catch (JsonProcessingException e) { - log.warn("[{}] Failed to lookup device by gateway id and name: [{}]", gatewayId, requestMsg.getDeviceName(), e); - throw new RuntimeException(e); - } finally { - deviceCreationLock.unlock(); } - }, dbCallbackExecutorService); + GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder() + .setDeviceInfo(getDeviceInfoProto(device)); + DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId()); + if (deviceProfile != null) { + builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))); + } else { + log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId()); + } + return TransportApiResponseMsg.newBuilder() + .setGetOrCreateDeviceResponseMsg(builder.build()) + .build(); + } catch (JsonProcessingException e) { + log.warn("[{}] Failed to lookup device by gateway id and name: [{}]", gatewayId, requestMsg.getDeviceName(), e); + throw new RuntimeException(e); + } finally { + deviceCreationLock.unlock(); + } } private ListenableFuture handle(ProvisionDeviceRequestMsg requestMsg) {