Merge pull request #8389 from thingsboard/fix/device-by-id-async

Remove usages of findDeviceByIdAsync
This commit is contained in:
Andrew Shvayka 2023-04-20 13:10:46 +03:00 committed by GitHub
commit f71e6437c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 88 additions and 147 deletions

View File

@ -90,35 +90,32 @@ public class ClaimDevicesServiceImpl implements ClaimDevicesService {
@Override
public ListenableFuture<Void> registerClaimingInfo(TenantId tenantId, DeviceId deviceId, String secretKey, long durationMs) {
ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(tenantId, deviceId);
return Futures.transformAsync(deviceFuture, device -> {
Cache cache = cacheManager.getCache(CLAIM_DEVICES_CACHE);
List<Object> key = constructCacheKey(device.getId());
if (isAllowedClaimingByDefault) {
if (device.getCustomerId().getId().equals(ModelConstants.NULL_UUID)) {
persistInCache(secretKey, durationMs, cache, key);
return Futures.immediateFuture(null);
}
log.warn("The device [{}] has been already claimed!", device.getName());
throw new IllegalArgumentException();
} else {
ListenableFuture<List<AttributeKvEntry>> claimingAllowedFuture = attributesService.find(tenantId, device.getId(),
DataConstants.SERVER_SCOPE, Collections.singletonList(CLAIM_ATTRIBUTE_NAME));
return Futures.transform(claimingAllowedFuture, list -> {
if (list != null && !list.isEmpty()) {
Optional<Boolean> claimingAllowedOptional = list.get(0).getBooleanValue();
if (claimingAllowedOptional.isPresent() && claimingAllowedOptional.get()
&& device.getCustomerId().getId().equals(ModelConstants.NULL_UUID)) {
persistInCache(secretKey, durationMs, cache, key);
return null;
}
}
log.warn("Failed to find claimingAllowed attribute for device or it is already claimed![{}]", device.getName());
throw new IllegalArgumentException();
}, MoreExecutors.directExecutor());
Device device = deviceService.findDeviceById(tenantId, deviceId);
Cache cache = cacheManager.getCache(CLAIM_DEVICES_CACHE);
List<Object> key = constructCacheKey(device.getId());
if (isAllowedClaimingByDefault) {
if (device.getCustomerId().getId().equals(ModelConstants.NULL_UUID)) {
persistInCache(secretKey, durationMs, cache, key);
return Futures.immediateFuture(null);
}
}, MoreExecutors.directExecutor());
log.warn("The device [{}] has been already claimed!", device.getName());
return Futures.immediateFailedFuture(new IllegalArgumentException());
} else {
ListenableFuture<List<AttributeKvEntry>> claimingAllowedFuture = attributesService.find(tenantId, device.getId(),
DataConstants.SERVER_SCOPE, Collections.singletonList(CLAIM_ATTRIBUTE_NAME));
return Futures.transform(claimingAllowedFuture, list -> {
if (list != null && !list.isEmpty()) {
Optional<Boolean> claimingAllowedOptional = list.get(0).getBooleanValue();
if (claimingAllowedOptional.isPresent() && claimingAllowedOptional.get()
&& device.getCustomerId().getId().equals(ModelConstants.NULL_UUID)) {
persistInCache(secretKey, durationMs, cache, key);
return null;
}
}
log.warn("Failed to find claimingAllowed attribute for device or it is already claimed![{}]", device.getName());
throw new IllegalArgumentException();
}, MoreExecutors.directExecutor());
}
}
private ListenableFuture<ClaimDataInfo> getClaimData(Cache cache, Device device) {

View File

@ -16,7 +16,6 @@
package org.thingsboard.server.service.edge.rpc.processor.device;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -108,8 +107,8 @@ public abstract class BaseDeviceProcessor extends BaseEdgeProcessor {
public ListenableFuture<Void> processDeviceCredentialsMsg(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) {
log.debug("[{}] Executing processDeviceCredentialsMsg, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg);
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB()));
ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(tenantId, deviceId);
return Futures.transform(deviceFuture, device -> {
return dbCallbackExecutorService.submit(() -> {
Device device = deviceService.findDeviceById(tenantId, deviceId);
if (device != null) {
log.debug("Updating device credentials for device [{}]. New device credentials Id [{}], value [{}]",
device.getName(), deviceCredentialsUpdateMsg.getCredentialsId(), deviceCredentialsUpdateMsg.getCredentialsValue());
@ -129,6 +128,6 @@ public abstract class BaseDeviceProcessor extends BaseEdgeProcessor {
log.warn("Can't find device by id [{}], deviceCredentialsUpdateMsg [{}]", deviceId, deviceCredentialsUpdateMsg);
}
return null;
}, dbCallbackExecutorService);
});
}
}

View File

@ -264,8 +264,7 @@ public class AccessValidator {
if (currentUser.isSystemAdmin()) {
callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
} else {
ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(currentUser.getTenantId(), new DeviceId(entityId.getId()));
Futures.addCallback(deviceFuture, getCallback(callback, device -> {
Futures.addCallback(Futures.immediateFuture(deviceService.findDeviceById(currentUser.getTenantId(), new DeviceId(entityId.getId()))), getCallback(callback, device -> {
if (device == null) {
return ValidationResult.entityNotFound(DEVICE_WITH_REQUESTED_ID_NOT_FOUND);
} else {

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.dao.device;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
@ -73,14 +72,11 @@ import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.service.PaginatedRemover;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.thingsboard.server.dao.DaoUtil.toUUIDs;
import static org.thingsboard.server.dao.service.Validator.validateId;
@ -508,27 +504,20 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
@Override
public ListenableFuture<List<Device>> findDevicesByQuery(TenantId tenantId, DeviceSearchQuery query) {
ListenableFuture<List<EntityRelation>> relations = relationService.findByQuery(tenantId, query.toEntitySearchQuery());
ListenableFuture<List<Device>> devices = Futures.transformAsync(relations, r -> {
return Futures.transform(relations, r -> {
EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection();
List<ListenableFuture<Device>> futures = new ArrayList<>();
List<Device> devices = new ArrayList<>();
for (EntityRelation relation : r) {
EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom();
if (entityId.getEntityType() == EntityType.DEVICE) {
futures.add(findDeviceByIdAsync(tenantId, new DeviceId(entityId.getId())));
Device device = findDeviceById(tenantId, new DeviceId(entityId.getId()));
if (query.getDeviceTypes().contains(device.getType())) {
devices.add(device);
}
}
}
return Futures.successfulAsList(futures);
return devices;
}, MoreExecutors.directExecutor());
devices = Futures.transform(devices, new Function<>() {
@Nullable
@Override
public List<Device> apply(@Nullable List<Device> deviceList) {
return deviceList == null ? Collections.emptyList() : deviceList.stream().filter(device -> query.getDeviceTypes().contains(device.getType())).collect(Collectors.toList());
}
}, MoreExecutors.directExecutor());
return devices;
}
@Override

View File

@ -25,6 +25,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.util.EntityContainer;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DashboardId;
@ -171,13 +172,12 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
}
private ListenableFuture<Boolean> processDevice(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {
return Futures.transformAsync(ctx.getDeviceService().findDeviceByIdAsync(ctx.getTenantId(), new DeviceId(entityContainer.getEntityId().getId())), device -> {
if (device != null) {
return processSave(ctx, sdId, relationType);
} else {
return Futures.immediateFuture(true);
}
}, ctx.getDbCallbackExecutor());
Device device = ctx.getDeviceService().findDeviceById(ctx.getTenantId(), new DeviceId(entityContainer.getEntityId().getId()));
if (device != null) {
return processSave(ctx, sdId, relationType);
} else {
return Futures.immediateFuture(true);
}
}
private ListenableFuture<Boolean> processAsset(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) {

View File

@ -39,7 +39,6 @@ import java.lang.reflect.Type;
import java.util.Map;
import static org.thingsboard.common.util.DonAsynchron.withCallback;
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
@Slf4j
public abstract class TbAbstractGetEntityDetailsNode<C extends TbAbstractGetEntityDetailsNodeConfiguration> implements TbNode {
@ -67,7 +66,7 @@ public abstract class TbAbstractGetEntityDetailsNode<C extends TbAbstractGetEnti
protected abstract ListenableFuture<TbMsg> getDetails(TbContext ctx, TbMsg msg);
protected abstract ListenableFuture<ContactBased> getContactBasedListenableFuture(TbContext ctx, TbMsg msg);
protected abstract ListenableFuture<? extends ContactBased> getContactBasedListenableFuture(TbContext ctx, TbMsg msg);
protected MessageData getDataAsJson(TbMsg msg) {
if (this.config.isAddToMetadata()) {
@ -79,7 +78,7 @@ public abstract class TbAbstractGetEntityDetailsNode<C extends TbAbstractGetEnti
protected ListenableFuture<TbMsg> getTbMsgListenableFuture(TbContext ctx, TbMsg msg, MessageData messageData, String prefix) {
if (!this.config.getDetailsList().isEmpty()) {
ListenableFuture<ContactBased> contactBasedListenableFuture = getContactBasedListenableFuture(ctx, msg);
ListenableFuture<? extends ContactBased> contactBasedListenableFuture = getContactBasedListenableFuture(ctx, msg);
ListenableFuture<JsonElement> resultObject = addContactProperties(messageData.getData(), contactBasedListenableFuture, prefix);
return transformMsg(ctx, msg, resultObject, messageData);
} else {
@ -102,7 +101,7 @@ public abstract class TbAbstractGetEntityDetailsNode<C extends TbAbstractGetEnti
}, MoreExecutors.directExecutor());
}
private ListenableFuture<JsonElement> addContactProperties(JsonElement data, ListenableFuture<ContactBased> entityFuture, String prefix) {
private ListenableFuture<JsonElement> addContactProperties(JsonElement data, ListenableFuture<? extends ContactBased> entityFuture, String prefix) {
return Futures.transformAsync(entityFuture, contactBased -> {
if (contactBased != null) {
JsonElement jsonElement = null;

View File

@ -26,6 +26,8 @@ import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.ContactBased;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.HasCustomerId;
import org.thingsboard.server.common.data.HasName;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EdgeId;
@ -59,81 +61,44 @@ public class TbGetCustomerDetailsNode extends TbAbstractGetEntityDetailsNode<TbG
}
@Override
protected ListenableFuture<ContactBased> getContactBasedListenableFuture(TbContext ctx, TbMsg msg) {
return Futures.transformAsync(getCustomer(ctx, msg), customer -> {
if (customer != null) {
return Futures.immediateFuture(customer);
protected ListenableFuture<? extends ContactBased> getContactBasedListenableFuture(TbContext ctx, TbMsg msg) {
return getCustomer(ctx, msg);
}
private ListenableFuture<Customer> getCustomer(TbContext ctx, TbMsg msg) {
ListenableFuture<? extends HasCustomerId> entityFuture;
switch (msg.getOriginator().getEntityType()) { // TODO: use EntityServiceRegistry
case DEVICE:
entityFuture = Futures.immediateFuture(ctx.getDeviceService().findDeviceById(ctx.getTenantId(), (DeviceId) msg.getOriginator()));
break;
case ASSET:
entityFuture = ctx.getAssetService().findAssetByIdAsync(ctx.getTenantId(), (AssetId) msg.getOriginator());
break;
case ENTITY_VIEW:
entityFuture = ctx.getEntityViewService().findEntityViewByIdAsync(ctx.getTenantId(), (EntityViewId) msg.getOriginator());
break;
case USER:
entityFuture = ctx.getUserService().findUserByIdAsync(ctx.getTenantId(), (UserId) msg.getOriginator());
break;
case EDGE:
entityFuture = ctx.getEdgeService().findEdgeByIdAsync(ctx.getTenantId(), (EdgeId) msg.getOriginator());
break;
default:
throw new RuntimeException(msg.getOriginator().getEntityType().getNormalName() + " entities not supported");
}
return Futures.transformAsync(entityFuture, entity -> {
if (entity != null) {
if (!entity.getCustomerId().isNullUid()) {
return ctx.getCustomerService().findCustomerByIdAsync(ctx.getTenantId(), entity.getCustomerId());
} else {
throw new RuntimeException(msg.getOriginator().getEntityType().getNormalName() +
(entity instanceof HasName ? " with name '" + ((HasName) entity).getName() + "'" : "")
+ " is not assigned to Customer");
}
} else {
return Futures.immediateFuture(null);
}
}, MoreExecutors.directExecutor());
}
private ListenableFuture<Customer> getCustomer(TbContext ctx, TbMsg msg) {
switch (msg.getOriginator().getEntityType()) {
case DEVICE:
return Futures.transformAsync(ctx.getDeviceService().findDeviceByIdAsync(ctx.getTenantId(), new DeviceId(msg.getOriginator().getId())), device -> {
if (device != null) {
if (!device.getCustomerId().isNullUid()) {
return ctx.getCustomerService().findCustomerByIdAsync(ctx.getTenantId(), device.getCustomerId());
} else {
throw new RuntimeException("Device with name '" + device.getName() + "' is not assigned to Customer.");
}
} else {
return Futures.immediateFuture(null);
}
}, MoreExecutors.directExecutor());
case ASSET:
return Futures.transformAsync(ctx.getAssetService().findAssetByIdAsync(ctx.getTenantId(), new AssetId(msg.getOriginator().getId())), asset -> {
if (asset != null) {
if (!asset.getCustomerId().isNullUid()) {
return ctx.getCustomerService().findCustomerByIdAsync(ctx.getTenantId(), asset.getCustomerId());
} else {
throw new RuntimeException("Asset with name '" + asset.getName() + "' is not assigned to Customer.");
}
} else {
return Futures.immediateFuture(null);
}
}, MoreExecutors.directExecutor());
case ENTITY_VIEW:
return Futures.transformAsync(ctx.getEntityViewService().findEntityViewByIdAsync(ctx.getTenantId(), new EntityViewId(msg.getOriginator().getId())), entityView -> {
if (entityView != null) {
if (!entityView.getCustomerId().isNullUid()) {
return ctx.getCustomerService().findCustomerByIdAsync(ctx.getTenantId(), entityView.getCustomerId());
} else {
throw new RuntimeException("EntityView with name '" + entityView.getName() + "' is not assigned to Customer.");
}
} else {
return Futures.immediateFuture(null);
}
}, MoreExecutors.directExecutor());
case USER:
return Futures.transformAsync(ctx.getUserService().findUserByIdAsync(ctx.getTenantId(), new UserId(msg.getOriginator().getId())), user -> {
if (user != null) {
if (!user.getCustomerId().isNullUid()) {
return ctx.getCustomerService().findCustomerByIdAsync(ctx.getTenantId(), user.getCustomerId());
} else {
throw new RuntimeException("User with name '" + user.getName() + "' is not assigned to Customer.");
}
} else {
return Futures.immediateFuture(null);
}
}, MoreExecutors.directExecutor());
case EDGE:
return Futures.transformAsync(ctx.getEdgeService().findEdgeByIdAsync(ctx.getTenantId(), new EdgeId(msg.getOriginator().getId())), edge -> {
if (edge != null) {
if (!edge.getCustomerId().isNullUid()) {
return ctx.getCustomerService().findCustomerByIdAsync(ctx.getTenantId(), edge.getCustomerId());
} else {
throw new RuntimeException("Edge with name '" + edge.getName() + "' is not assigned to Customer.");
}
} else {
return Futures.immediateFuture(null);
}
}, MoreExecutors.directExecutor());
default:
throw new RuntimeException("Entity with entityType '" + msg.getOriginator().getEntityType() + "' is not supported.");
}
}
}

View File

@ -53,13 +53,7 @@ public class TbGetTenantDetailsNode extends TbAbstractGetEntityDetailsNode<TbGet
}
@Override
protected ListenableFuture<ContactBased> getContactBasedListenableFuture(TbContext ctx, TbMsg msg) {
return Futures.transformAsync(ctx.getTenantService().findTenantByIdAsync(ctx.getTenantId(), ctx.getTenantId()), tenant -> {
if (tenant != null) {
return Futures.immediateFuture(tenant);
} else {
return Futures.immediateFuture(null);
}
}, MoreExecutors.directExecutor());
protected ListenableFuture<? extends ContactBased> getContactBasedListenableFuture(TbContext ctx, TbMsg msg) {
return ctx.getTenantService().findTenantByIdAsync(ctx.getTenantId(), ctx.getTenantId());
}
}

View File

@ -31,7 +31,6 @@ public class EntitiesCustomerIdAsyncLoader {
public static ListenableFuture<CustomerId> findEntityIdAsync(TbContext ctx, EntityId original) {
switch (original.getEntityType()) {
case CUSTOMER:
return Futures.immediateFuture((CustomerId) original);
@ -40,7 +39,7 @@ public class EntitiesCustomerIdAsyncLoader {
case ASSET:
return getCustomerAsync(ctx.getAssetService().findAssetByIdAsync(ctx.getTenantId(), (AssetId) original));
case DEVICE:
return getCustomerAsync(ctx.getDeviceService().findDeviceByIdAsync(ctx.getTenantId(), (DeviceId) original));
return getCustomerAsync(Futures.immediateFuture(ctx.getDeviceService().findDeviceById(ctx.getTenantId(), (DeviceId) original)));
default:
return Futures.immediateFailedFuture(new TbNodeException("Unexpected original EntityType " + original.getEntityType()));
}

View File

@ -37,7 +37,7 @@ import java.util.function.Function;
public class EntitiesFieldsAsyncLoader {
public static ListenableFuture<EntityFieldsData> findAsync(TbContext ctx, EntityId original) {
switch (original.getEntityType()) {
switch (original.getEntityType()) { // TODO: use EntityServiceRegistry
case TENANT:
return getAsync(ctx.getTenantService().findTenantByIdAsync(ctx.getTenantId(), (TenantId) original),
EntityFieldsData::new);
@ -51,7 +51,7 @@ public class EntitiesFieldsAsyncLoader {
return getAsync(ctx.getAssetService().findAssetByIdAsync(ctx.getTenantId(), (AssetId) original),
EntityFieldsData::new);
case DEVICE:
return getAsync(ctx.getDeviceService().findDeviceByIdAsync(ctx.getTenantId(), (DeviceId) original),
return getAsync(Futures.immediateFuture(ctx.getDeviceService().findDeviceById(ctx.getTenantId(), (DeviceId) original)),
EntityFieldsData::new);
case ALARM:
return getAsync(ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), (AlarmId) original),

View File

@ -219,7 +219,7 @@ public abstract class AbstractAttributeNodeTest {
void mockFindDevice(Device device) {
when(ctx.getDeviceService()).thenReturn(deviceService);
when(deviceService.findDeviceByIdAsync(any(), eq(device.getId()))).thenReturn(Futures.immediateFuture(device));
when(deviceService.findDeviceById(any(), eq(device.getId()))).thenReturn(device);
}
void mockFindAsset(Asset asset) {