refactoring
This commit is contained in:
parent
c43e8950a2
commit
fbdbea7f59
@ -20,8 +20,10 @@ import com.google.common.util.concurrent.Futures;
|
|||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||||
|
import org.thingsboard.server.cluster.TbClusterService;
|
||||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
||||||
import org.thingsboard.server.common.data.EntityType;
|
import org.thingsboard.server.common.data.EntityType;
|
||||||
import org.thingsboard.server.common.data.EntityView;
|
import org.thingsboard.server.common.data.EntityView;
|
||||||
@ -45,7 +47,6 @@ import org.thingsboard.server.gen.transport.TransportProtos;
|
|||||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||||
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
|
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
|
||||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
||||||
import org.thingsboard.server.cluster.TbClusterService;
|
|
||||||
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
|
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
@ -118,57 +119,44 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void saveAndNotify(TenantId tenantId, CustomerId customerId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
|
public void saveAndNotify(TenantId tenantId, CustomerId customerId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
|
||||||
checkInternalEntity(entityId);
|
doSaveAndNotify(tenantId, customerId, entityId, ts, ttl, callback, true);
|
||||||
boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null;
|
|
||||||
if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
|
|
||||||
saveAndNotifyInternal(tenantId, entityId, ts, ttl, new FutureCallback<Integer>() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(Integer result) {
|
|
||||||
if (!sysTenant && result != null && result > 0) {
|
|
||||||
apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result);
|
|
||||||
}
|
|
||||||
callback.onSuccess(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Throwable t) {
|
|
||||||
callback.onFailure(t);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
callback.onFailure(new RuntimeException("DB storage writes are disabled due to API limits!"));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void saveWithoutLatestAndNotify(TenantId tenantId, CustomerId customerId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
|
public void saveWithoutLatestAndNotify(TenantId tenantId, CustomerId customerId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
|
||||||
|
doSaveAndNotify(tenantId, customerId, entityId, ts, ttl, callback, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doSaveAndNotify(TenantId tenantId, CustomerId customerId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback, boolean saveLatest) {
|
||||||
checkInternalEntity(entityId);
|
checkInternalEntity(entityId);
|
||||||
boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null;
|
boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null;
|
||||||
if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
|
if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
|
||||||
saveWithoutLatestAndNotifyInternal(tenantId, entityId, ts, ttl, new FutureCallback<Integer>() {
|
if (saveLatest) {
|
||||||
@Override
|
saveAndNotifyInternal(tenantId, entityId, ts, ttl, getCallback(tenantId, customerId, sysTenant, callback));
|
||||||
public void onSuccess(Integer result) {
|
} else {
|
||||||
if (!sysTenant && result != null && result > 0) {
|
saveWithoutLatestAndNotifyInternal(tenantId, entityId, ts, ttl, getCallback(tenantId, customerId, sysTenant, callback));
|
||||||
apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result);
|
}
|
||||||
}
|
|
||||||
callback.onSuccess(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Throwable t) {
|
|
||||||
callback.onFailure(t);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
} else {
|
||||||
callback.onFailure(new RuntimeException("DB storage writes are disabled due to API limits!"));
|
callback.onFailure(new RuntimeException("DB storage writes are disabled due to API limits!"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void saveWithoutLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Integer> callback) {
|
@NotNull
|
||||||
ListenableFuture<Integer> saveFuture = tsService.saveWithoutLatest(tenantId, entityId, ts, ttl);
|
private FutureCallback<Integer> getCallback(TenantId tenantId, CustomerId customerId, boolean sysTenant, FutureCallback<Void> callback) {
|
||||||
addMainCallback(saveFuture, callback);
|
return new FutureCallback<>() {
|
||||||
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts));
|
@Override
|
||||||
// TODO: 12/11/2021 do we need entityView searching here?
|
public void onSuccess(Integer result) {
|
||||||
|
if (!sysTenant && result != null && result > 0) {
|
||||||
|
apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result);
|
||||||
|
}
|
||||||
|
callback.onSuccess(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
callback.onFailure(t);
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -179,6 +167,15 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
@Override
|
@Override
|
||||||
public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Integer> callback) {
|
public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Integer> callback) {
|
||||||
ListenableFuture<Integer> saveFuture = tsService.save(tenantId, entityId, ts, ttl);
|
ListenableFuture<Integer> saveFuture = tsService.save(tenantId, entityId, ts, ttl);
|
||||||
|
addCallbacks(tenantId, entityId, ts, callback, saveFuture);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void saveWithoutLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Integer> callback) {
|
||||||
|
ListenableFuture<Integer> saveFuture = tsService.saveWithoutLatest(tenantId, entityId, ts, ttl);
|
||||||
|
addCallbacks(tenantId, entityId, ts, callback, saveFuture);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addCallbacks(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Integer> callback, ListenableFuture<Integer> saveFuture) {
|
||||||
addMainCallback(saveFuture, callback);
|
addMainCallback(saveFuture, callback);
|
||||||
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts));
|
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts));
|
||||||
if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) {
|
if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) {
|
||||||
|
|||||||
@ -145,24 +145,26 @@ public class BaseTimeseriesService implements TimeseriesService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
|
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
|
||||||
List<ListenableFuture<Integer>> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY);
|
return doSave(tenantId, entityId, tsKvEntries, ttl, true);
|
||||||
for (TsKvEntry tsKvEntry : tsKvEntries) {
|
|
||||||
if (tsKvEntry == null) {
|
|
||||||
throw new IncorrectParameterException("Key value entry can't be null");
|
|
||||||
}
|
|
||||||
saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl);
|
|
||||||
}
|
|
||||||
return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<Integer> saveWithoutLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
|
public ListenableFuture<Integer> saveWithoutLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
|
||||||
List<ListenableFuture<Integer>> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY_WITHOUT_LATEST);
|
return doSave(tenantId, entityId, tsKvEntries, ttl, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ListenableFuture<Integer> doSave(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl, boolean saveLatest) {
|
||||||
|
int inserts = saveLatest ? INSERTS_PER_ENTRY : INSERTS_PER_ENTRY_WITHOUT_LATEST;
|
||||||
|
List<ListenableFuture<Integer>> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * inserts);
|
||||||
for (TsKvEntry tsKvEntry : tsKvEntries) {
|
for (TsKvEntry tsKvEntry : tsKvEntries) {
|
||||||
if (tsKvEntry == null) {
|
if (tsKvEntry == null) {
|
||||||
throw new IncorrectParameterException("Key value entry can't be null");
|
throw new IncorrectParameterException("Key value entry can't be null");
|
||||||
}
|
}
|
||||||
saveWithoutLatestAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl);
|
if (saveLatest) {
|
||||||
|
saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl);
|
||||||
|
} else {
|
||||||
|
saveWithoutLatestAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor());
|
return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor());
|
||||||
}
|
}
|
||||||
@ -180,15 +182,15 @@ public class BaseTimeseriesService implements TimeseriesService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void saveAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Integer>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
private void saveAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Integer>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
||||||
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
|
doSaveAndRegisterFuturesFor(tenantId, futures, entityId, tsKvEntry, ttl);
|
||||||
throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only");
|
|
||||||
}
|
|
||||||
futures.add(timeseriesDao.savePartition(tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey()));
|
|
||||||
futures.add(Futures.transform(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry), v -> 0, MoreExecutors.directExecutor()));
|
futures.add(Futures.transform(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry), v -> 0, MoreExecutors.directExecutor()));
|
||||||
futures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void saveWithoutLatestAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Integer>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
private void saveWithoutLatestAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Integer>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
||||||
|
doSaveAndRegisterFuturesFor(tenantId, futures, entityId, tsKvEntry, ttl);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doSaveAndRegisterFuturesFor(TenantId tenantId, List<ListenableFuture<Integer>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
||||||
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
|
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
|
||||||
throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only");
|
throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only");
|
||||||
}
|
}
|
||||||
|
|||||||
@ -23,9 +23,8 @@ import org.thingsboard.rule.engine.api.TbContext;
|
|||||||
import org.thingsboard.rule.engine.api.TbNode;
|
import org.thingsboard.rule.engine.api.TbNode;
|
||||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||||
import org.thingsboard.server.common.data.TenantProfile;
|
|
||||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
||||||
import org.thingsboard.server.common.data.id.CustomerId;
|
import org.thingsboard.server.common.data.TenantProfile;
|
||||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
||||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
@ -46,7 +45,9 @@ import java.util.concurrent.TimeUnit;
|
|||||||
name = "save timeseries",
|
name = "save timeseries",
|
||||||
configClazz = TbMsgTimeseriesNodeConfiguration.class,
|
configClazz = TbMsgTimeseriesNodeConfiguration.class,
|
||||||
nodeDescription = "Saves timeseries data",
|
nodeDescription = "Saves timeseries data",
|
||||||
nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY_REQUEST' message type",
|
nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY_REQUEST' message type. " +
|
||||||
|
"Timestamp in milliseconds will be taken from metadata.ts, otherwise 'now' timestamp will be applied. " +
|
||||||
|
"Allows stopping updating values for incoming keys in the latest ts_kv table if 'skipLatestPersistence' is set to true.",
|
||||||
uiResources = {"static/rulenode/rulenode-core-config.js"},
|
uiResources = {"static/rulenode/rulenode-core-config.js"},
|
||||||
configDirective = "tbActionNodeCustomTimeseriesConfig",
|
configDirective = "tbActionNodeCustomTimeseriesConfig",
|
||||||
icon = "file_upload"
|
icon = "file_upload"
|
||||||
@ -107,7 +108,7 @@ public class TbMsgTimeseriesNode implements TbNode {
|
|||||||
if (!StringUtils.isEmpty(tsStr)) {
|
if (!StringUtils.isEmpty(tsStr)) {
|
||||||
try {
|
try {
|
||||||
ts = Long.parseLong(tsStr);
|
ts = Long.parseLong(tsStr);
|
||||||
} catch (NumberFormatException e) {
|
} catch (NumberFormatException ignored) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ts = msg.getTs();
|
ts = msg.getTs();
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user