findEntityTimeseriesAndAttributesKeysByQuery improvements

This commit is contained in:
YevhenBondarenko 2020-12-16 11:05:35 +02:00
parent f63b4b1f7c
commit dfb82bf28f
3 changed files with 61 additions and 63 deletions

View File

@ -45,6 +45,8 @@ public class EntityQueryController extends BaseController {
@Autowired
private EntityQueryService entityQueryService;
private static final int MAX_PAGE_SIZE = 100;
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/entitiesQuery/count", method = RequestMethod.POST)
@ResponseBody
@ -91,12 +93,10 @@ public class EntityQueryController extends BaseController {
checkNotNull(query);
try {
EntityDataPageLink pageLink = query.getPageLink();
if (pageLink.getPageSize() > 100) {
pageLink.setPageSize(100);
if (pageLink.getPageSize() > MAX_PAGE_SIZE) {
pageLink.setPageSize(MAX_PAGE_SIZE);
}
DeferredResult<ResponseEntity> response = new DeferredResult<>();
entityQueryService.getKeysByQueryCallback(getCurrentUser(), tenantId, query, isTimeseries, isAttributes, response);
return response;
return entityQueryService.getKeysByQuery(getCurrentUser(), tenantId, query, isTimeseries, isAttributes);
} catch (Exception e) {
throw handleException(e);
}

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.service.query;
import com.datastax.oss.driver.internal.core.util.CollectionsUtils;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.FutureCallback;
@ -61,6 +60,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@Service
@ -123,25 +123,38 @@ public class DefaultEntityQueryService implements EntityQueryService {
}
}
private EntityDataQuery buildEntityDataQuery(AlarmDataQuery query) {
EntityDataSortOrder sortOrder = query.getPageLink().getSortOrder();
EntityDataSortOrder entitiesSortOrder;
if (sortOrder == null || sortOrder.getKey().getType().equals(EntityKeyType.ALARM_FIELD)) {
entitiesSortOrder = new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, ModelConstants.CREATED_TIME_PROPERTY));
} else {
entitiesSortOrder = sortOrder;
}
EntityDataPageLink edpl = new EntityDataPageLink(maxEntitiesPerAlarmSubscription, 0, null, entitiesSortOrder);
return new EntityDataQuery(query.getEntityFilter(), edpl, query.getEntityFields(), query.getLatestValues(), query.getKeyFilters());
}
@Override
public void getKeysByQueryCallback(SecurityUser securityUser, TenantId tenantId, EntityDataQuery query,
boolean isTimeseries, boolean isAttributes, DeferredResult<ResponseEntity> response) {
public DeferredResult<ResponseEntity> getKeysByQuery(SecurityUser securityUser, TenantId tenantId, EntityDataQuery query,
boolean isTimeseries, boolean isAttributes) {
final DeferredResult<ResponseEntity> response = new DeferredResult<>();
if (!isAttributes && !isTimeseries) {
getEmptyResponseCallback(response);
return;
replyWithEmptyResponse(response);
return response;
}
List<EntityId> ids = this.findEntityDataByQuery(securityUser, query).getData().stream()
.map(EntityData::getEntityId)
.collect(Collectors.toList());
if (ids.isEmpty()) {
getEmptyResponseCallback(response);
return;
replyWithEmptyResponse(response);
return response;
}
Set<EntityType> types = ids.stream().map(EntityId::getEntityType).collect(Collectors.toSet());
ListenableFuture<List<String>> timeseriesKeysFuture;
ListenableFuture<List<String>> attributesKeysFuture;
final ListenableFuture<List<String>> timeseriesKeysFuture;
final ListenableFuture<List<String>> attributesKeysFuture;
if (isTimeseries) {
timeseriesKeysFuture = dbCallbackExecutor.submit(() -> timeseriesService.findAllKeysByEntityIds(tenantId, ids));
@ -155,67 +168,49 @@ public class DefaultEntityQueryService implements EntityQueryService {
typesMap.forEach((type, entityIds) -> futures.add(dbCallbackExecutor.submit(() -> attributesService.findAllKeysByEntityIds(tenantId, type, entityIds))));
attributesKeysFuture = Futures.transform(Futures.allAsList(futures), lists -> {
if (CollectionUtils.isEmpty(lists)) {
return null;
return Collections.emptyList();
}
return lists.stream().flatMap(List::stream).distinct().sorted().collect(Collectors.toList());
}, dbCallbackExecutor);
} else {
attributesKeysFuture = null;
}
if (timeseriesKeysFuture != null && attributesKeysFuture != null) {
Futures.whenAllComplete(timeseriesKeysFuture, attributesKeysFuture).call(() -> {
if (isTimeseries && isAttributes) {
Futures.whenAllComplete(timeseriesKeysFuture, attributesKeysFuture).run(() -> {
try {
getResponseCallback(response, types, timeseriesKeysFuture.get(), attributesKeysFuture.get());
replyWithResponse(response, types, timeseriesKeysFuture.get(), attributesKeysFuture.get());
} catch (Exception e) {
log.error("Failed to fetch timeseries and attributes keys!", e);
AccessValidator.handleError(e, response, HttpStatus.INTERNAL_SERVER_ERROR);
}
return null;
}, dbCallbackExecutor);
} else if (timeseriesKeysFuture != null) {
Futures.addCallback(timeseriesKeysFuture, new FutureCallback<List<String>>() {
@Override
public void onSuccess(@Nullable List<String> keys) {
getResponseCallback(response, types, keys, null);
}
@Override
public void onFailure(Throwable t) {
log.error("Failed to fetch timeseries keys!", t);
AccessValidator.handleError(t, response, HttpStatus.INTERNAL_SERVER_ERROR);
}
}, dbCallbackExecutor);
} else if (isTimeseries) {
addCallback(timeseriesKeysFuture, keys -> replyWithResponse(response, types, keys, null),
error -> {
log.error("Failed to fetch timeseries keys!", error);
AccessValidator.handleError(error, response, HttpStatus.INTERNAL_SERVER_ERROR);
});
} else {
Futures.addCallback(attributesKeysFuture, new FutureCallback<List<String>>() {
@Override
public void onSuccess(@Nullable List<String> keys) {
getResponseCallback(response, types, null, keys);
}
@Override
public void onFailure(Throwable t) {
log.error("Failed to fetch attributes keys!", t);
AccessValidator.handleError(t, response, HttpStatus.INTERNAL_SERVER_ERROR);
}
}, dbCallbackExecutor);
addCallback(attributesKeysFuture, keys -> replyWithResponse(response, types, null, keys),
error -> {
log.error("Failed to fetch attributes keys!", error);
AccessValidator.handleError(error, response, HttpStatus.INTERNAL_SERVER_ERROR);
});
}
return response;
}
private void getResponseCallback(DeferredResult<ResponseEntity> response, Set<EntityType> types, List<String> timeseriesKeys, List<String> attributesKeys) {
private void replyWithResponse(DeferredResult<ResponseEntity> response, Set<EntityType> types, List<String> timeseriesKeys, List<String> attributesKeys) {
ObjectNode json = JacksonUtil.newObjectNode();
addItemsToArrayNode(json.putArray("types"), types);
addItemsToArrayNode(json.putArray("timeseriesKeys"), timeseriesKeys);
addItemsToArrayNode(json.putArray("attributesKeys"), attributesKeys);
response.setResult(new ResponseEntity(json, HttpStatus.OK));
}
private void getEmptyResponseCallback(DeferredResult<ResponseEntity> response) {
getResponseCallback(response, null, null, null);
private void replyWithEmptyResponse(DeferredResult<ResponseEntity> response) {
replyWithResponse(response, Collections.emptySet(), Collections.emptyList(), Collections.emptyList());
}
private void addItemsToArrayNode(ArrayNode arrayNode, Collection<?> collection) {
@ -224,15 +219,18 @@ public class DefaultEntityQueryService implements EntityQueryService {
}
}
private EntityDataQuery buildEntityDataQuery(AlarmDataQuery query) {
EntityDataSortOrder sortOrder = query.getPageLink().getSortOrder();
EntityDataSortOrder entitiesSortOrder;
if (sortOrder == null || sortOrder.getKey().getType().equals(EntityKeyType.ALARM_FIELD)) {
entitiesSortOrder = new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, ModelConstants.CREATED_TIME_PROPERTY));
} else {
entitiesSortOrder = sortOrder;
}
EntityDataPageLink edpl = new EntityDataPageLink(maxEntitiesPerAlarmSubscription, 0, null, entitiesSortOrder);
return new EntityDataQuery(query.getEntityFilter(), edpl, query.getEntityFields(), query.getLatestValues(), query.getKeyFilters());
private void addCallback(ListenableFuture<List<String>> future, Consumer<List<String>> success, Consumer<Throwable> error) {
Futures.addCallback(future, new FutureCallback<List<String>>() {
@Override
public void onSuccess(@Nullable List<String> keys) {
success.accept(keys);
}
@Override
public void onFailure(Throwable t) {
error.accept(t);
}
}, dbCallbackExecutor);
}
}

View File

@ -34,7 +34,7 @@ public interface EntityQueryService {
PageData<AlarmData> findAlarmDataByQuery(SecurityUser securityUser, AlarmDataQuery query);
void getKeysByQueryCallback(SecurityUser securityUser, TenantId tenantId, EntityDataQuery query,
boolean isTimeseries, boolean isAttributes, DeferredResult<ResponseEntity> response);
DeferredResult<ResponseEntity> getKeysByQuery(SecurityUser securityUser, TenantId tenantId, EntityDataQuery query,
boolean isTimeseries, boolean isAttributes);
}