Use new thread pool for handling gateway requests

This commit is contained in:
Andrii Shvaika 2023-11-06 18:06:05 +02:00
parent 162bf8a1f2
commit 3b8c6869bd

View File

@ -192,7 +192,7 @@ public class DefaultTransportApiService implements TransportApiService {
final String certChain = msg.getCertificateChain(); final String certChain = msg.getCertificateChain();
result = handlerExecutor.submit(() -> validateOrCreateDeviceX509Certificate(certChain)); result = handlerExecutor.submit(() -> validateOrCreateDeviceX509Certificate(certChain));
} else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) { } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) {
result = handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()); result = handlerExecutor.submit(() -> handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()));
} else if (transportApiRequestMsg.hasEntityProfileRequestMsg()) { } else if (transportApiRequestMsg.hasEntityProfileRequestMsg()) {
result = handle(transportApiRequestMsg.getEntityProfileRequestMsg()); result = handle(transportApiRequestMsg.getEntityProfileRequestMsg());
} else if (transportApiRequestMsg.hasLwM2MRequestMsg()) { } else if (transportApiRequestMsg.hasLwM2MRequestMsg()) {
@ -223,7 +223,6 @@ public class DefaultTransportApiService implements TransportApiService {
} }
private TransportApiResponseMsg validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) { private TransportApiResponseMsg validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {
//TODO: Make async and enable caching
DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId); DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);
if (credentials != null && credentials.getCredentialsType() == credentialsType) { if (credentials != null && credentials.getCredentialsType() == credentialsType) {
return getDeviceInfo(credentials); return getDeviceInfo(credentials);
@ -336,76 +335,74 @@ public class DefaultTransportApiService implements TransportApiService {
return VALID; return VALID;
} }
private ListenableFuture<TransportApiResponseMsg> handle(GetOrCreateDeviceFromGatewayRequestMsg requestMsg) { private TransportApiResponseMsg handle(GetOrCreateDeviceFromGatewayRequestMsg requestMsg) {
DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB())); DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB()));
ListenableFuture<Device> gatewayFuture = deviceService.findDeviceByIdAsync(TenantId.SYS_TENANT_ID, gatewayId); Device gateway = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, gatewayId);
return Futures.transform(gatewayFuture, gateway -> { Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(requestMsg.getDeviceName(), id -> new ReentrantLock());
Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(requestMsg.getDeviceName(), id -> new ReentrantLock()); deviceCreationLock.lock();
deviceCreationLock.lock(); try {
try { Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName());
Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName()); if (device == null) {
if (device == null) { TenantId tenantId = gateway.getTenantId();
TenantId tenantId = gateway.getTenantId(); device = new Device();
device = new Device(); device.setTenantId(tenantId);
device.setTenantId(tenantId); device.setName(requestMsg.getDeviceName());
device.setName(requestMsg.getDeviceName()); device.setType(requestMsg.getDeviceType());
device.setType(requestMsg.getDeviceType()); device.setCustomerId(gateway.getCustomerId());
device.setCustomerId(gateway.getCustomerId()); DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType());
DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType());
device.setDeviceProfileId(deviceProfile.getId()); device.setDeviceProfileId(deviceProfile.getId());
ObjectNode additionalInfo = JacksonUtil.newObjectNode(); ObjectNode additionalInfo = JacksonUtil.newObjectNode();
additionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString()); additionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString());
device.setAdditionalInfo(additionalInfo); device.setAdditionalInfo(additionalInfo);
Device savedDevice = deviceService.saveDevice(device);
tbClusterService.onDeviceUpdated(savedDevice, null);
device = savedDevice;
relationService.saveRelation(TenantId.SYS_TENANT_ID, new EntityRelation(gateway.getId(), device.getId(), "Created"));
TbMsgMetaData metaData = new TbMsgMetaData();
CustomerId customerId = gateway.getCustomerId();
if (customerId != null && !customerId.isNullUid()) {
metaData.putValue("customerId", customerId.toString());
}
metaData.putValue("gatewayId", gatewayId.toString());
DeviceId deviceId = device.getId();
JsonNode entityNode = JacksonUtil.valueToTree(device);
TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, deviceId, customerId, metaData, TbMsgDataType.JSON, JacksonUtil.toString(entityNode));
tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, null);
} else {
JsonNode deviceAdditionalInfo = device.getAdditionalInfo();
if (deviceAdditionalInfo == null) {
deviceAdditionalInfo = JacksonUtil.newObjectNode();
}
if (deviceAdditionalInfo.isObject() &&
(!deviceAdditionalInfo.has(DataConstants.LAST_CONNECTED_GATEWAY)
|| !gatewayId.toString().equals(deviceAdditionalInfo.get(DataConstants.LAST_CONNECTED_GATEWAY).asText()))) {
ObjectNode newDeviceAdditionalInfo = (ObjectNode) deviceAdditionalInfo;
newDeviceAdditionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString());
Device savedDevice = deviceService.saveDevice(device); Device savedDevice = deviceService.saveDevice(device);
tbClusterService.onDeviceUpdated(savedDevice, null); tbClusterService.onDeviceUpdated(savedDevice, device);
device = savedDevice;
relationService.saveRelation(TenantId.SYS_TENANT_ID, new EntityRelation(gateway.getId(), device.getId(), "Created"));
TbMsgMetaData metaData = new TbMsgMetaData();
CustomerId customerId = gateway.getCustomerId();
if (customerId != null && !customerId.isNullUid()) {
metaData.putValue("customerId", customerId.toString());
}
metaData.putValue("gatewayId", gatewayId.toString());
DeviceId deviceId = device.getId();
JsonNode entityNode = JacksonUtil.valueToTree(device);
TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, deviceId, customerId, metaData, TbMsgDataType.JSON, JacksonUtil.toString(entityNode));
tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, null);
} else {
JsonNode deviceAdditionalInfo = device.getAdditionalInfo();
if (deviceAdditionalInfo == null) {
deviceAdditionalInfo = JacksonUtil.newObjectNode();
}
if (deviceAdditionalInfo.isObject() &&
(!deviceAdditionalInfo.has(DataConstants.LAST_CONNECTED_GATEWAY)
|| !gatewayId.toString().equals(deviceAdditionalInfo.get(DataConstants.LAST_CONNECTED_GATEWAY).asText()))) {
ObjectNode newDeviceAdditionalInfo = (ObjectNode) deviceAdditionalInfo;
newDeviceAdditionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString());
Device savedDevice = deviceService.saveDevice(device);
tbClusterService.onDeviceUpdated(savedDevice, device);
}
} }
GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder()
.setDeviceInfo(getDeviceInfoProto(device));
DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId());
if (deviceProfile != null) {
builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile)));
} else {
log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId());
}
return TransportApiResponseMsg.newBuilder()
.setGetOrCreateDeviceResponseMsg(builder.build())
.build();
} catch (JsonProcessingException e) {
log.warn("[{}] Failed to lookup device by gateway id and name: [{}]", gatewayId, requestMsg.getDeviceName(), e);
throw new RuntimeException(e);
} finally {
deviceCreationLock.unlock();
} }
}, dbCallbackExecutorService); GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder()
.setDeviceInfo(getDeviceInfoProto(device));
DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId());
if (deviceProfile != null) {
builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile)));
} else {
log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId());
}
return TransportApiResponseMsg.newBuilder()
.setGetOrCreateDeviceResponseMsg(builder.build())
.build();
} catch (JsonProcessingException e) {
log.warn("[{}] Failed to lookup device by gateway id and name: [{}]", gatewayId, requestMsg.getDeviceName(), e);
throw new RuntimeException(e);
} finally {
deviceCreationLock.unlock();
}
} }
private ListenableFuture<TransportApiResponseMsg> handle(ProvisionDeviceRequestMsg requestMsg) { private ListenableFuture<TransportApiResponseMsg> handle(ProvisionDeviceRequestMsg requestMsg) {