Introduce TbTelemetryService

This commit is contained in:
ViacheslavKlimov 2025-05-05 17:41:30 +03:00
parent 1e5b6cc9a9
commit 26a41b7c32
3 changed files with 144 additions and 14 deletions

View File

@ -92,6 +92,7 @@ import org.thingsboard.server.service.security.AccessValidator;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.permission.Operation;
import org.thingsboard.server.service.telemetry.AttributeData;
import org.thingsboard.server.service.telemetry.TbTelemetryService;
import org.thingsboard.server.service.telemetry.TsData;
import java.util.ArrayList;
@ -155,6 +156,9 @@ public class TelemetryController extends BaseController {
@Autowired
private AccessValidator accessValidator;
@Autowired
private TbTelemetryService tbTelemetryService;
@Value("${transport.json.max_string_value_length:0}")
private int maxStringValueLength;
@ -323,20 +327,11 @@ public class TelemetryController extends BaseController {
@RequestParam(name = "orderBy", defaultValue = "DESC") String orderBy,
@Parameter(description = STRICT_DATA_TYPES_DESCRIPTION)
@RequestParam(name = "useStrictDataTypes", required = false, defaultValue = "false") Boolean useStrictDataTypes) throws ThingsboardException {
return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr,
(result, tenantId, entityId) -> {
AggregationParams params;
Aggregation agg = Aggregation.valueOf(aggStr);
if (Aggregation.NONE.equals(agg)) {
params = AggregationParams.none();
} else if (intervalType == null || IntervalType.MILLISECONDS.equals(intervalType)) {
params = interval == 0L ? AggregationParams.none() : AggregationParams.milliseconds(agg, interval);
} else {
params = AggregationParams.calendar(agg, intervalType, timeZone);
}
List<ReadTsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, params, limit, orderBy)).collect(Collectors.toList());
Futures.addCallback(tsService.findAll(tenantId, entityId, queries), getTsKvListCallback(result, useStrictDataTypes), MoreExecutors.directExecutor());
});
DeferredResult<ResponseEntity> response = new DeferredResult<>();
Futures.addCallback(tbTelemetryService.getTimeseries(EntityIdFactory.getByTypeAndId(entityType, entityIdStr), toKeysList(keys), startTs, endTs,
intervalType, interval, timeZone, limit, Aggregation.valueOf(aggStr), orderBy, useStrictDataTypes, getCurrentUser()),
getTsKvListCallback(response, useStrictDataTypes), MoreExecutors.directExecutor());
return response;
}
@ApiOperation(value = "Save device attributes (saveDeviceAttributes)",

View File

@ -0,0 +1,92 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.telemetry;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.AggregationParams;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.IntervalType;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.service.security.AccessValidator;
import org.thingsboard.server.service.security.ValidationResult;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.permission.Operation;
import java.util.List;
import java.util.stream.Collectors;
@Service
@Slf4j
@RequiredArgsConstructor
public class DefaultTbTelemetryService implements TbTelemetryService {
private final TimeseriesService tsService;
private final AccessValidator accessValidator;
@Override
public ListenableFuture<List<TsKvEntry>> getTimeseries(EntityId entityId, List<String> keys, Long startTs, Long endTs, IntervalType intervalType,
Long interval, String timeZone, Integer limit, Aggregation agg, String orderBy,
Boolean useStrictDataTypes, SecurityUser currentUser) {
SettableFuture<List<TsKvEntry>> future = SettableFuture.create();
accessValidator.validate(currentUser, Operation.READ_TELEMETRY, entityId, new FutureCallback<>() {
@Override
public void onSuccess(ValidationResult validationResult) {
try {
AggregationParams params;
if (Aggregation.NONE.equals(agg)) {
params = AggregationParams.none();
} else if (intervalType == null || IntervalType.MILLISECONDS.equals(intervalType)) {
params = interval == 0L ? AggregationParams.none() : AggregationParams.milliseconds(agg, interval);
} else {
params = AggregationParams.calendar(agg, intervalType, timeZone);
}
List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, params, limit, orderBy)).collect(Collectors.toList());
Futures.addCallback(tsService.findAll(currentUser.getTenantId(), entityId, queries), new FutureCallback<>() {
@Override
public void onSuccess(List<TsKvEntry> result) {
future.set(result);
}
@Override
public void onFailure(Throwable t) {
future.setException(t);
}
}, MoreExecutors.directExecutor());
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable t) {
future.setException(t);
}
});
return future;
}
}

View File

@ -0,0 +1,43 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.telemetry;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.IntervalType;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.service.security.model.SecurityUser;
import java.util.List;
public interface TbTelemetryService {
ListenableFuture<List<TsKvEntry>> getTimeseries(EntityId entityId,
List<String> keys,
Long startTs,
Long endTs,
IntervalType intervalType,
Long interval,
String timeZone,
Integer limit,
Aggregation agg,
String orderBy,
Boolean useStrictDataTypes,
SecurityUser currentUser) throws ThingsboardException;
}