Save time series strategies: initial BE implementation

This commit is contained in:
Dmytro Skarzhynets 2025-01-09 17:43:58 +02:00 committed by Dmytro Skarzhynets
commit 6ca45f1962
10 changed files with 331 additions and 22 deletions

View File

@ -348,7 +348,8 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
.tenantId(entityView.getTenantId())
.entityId(entityId)
.entries(latestValues)
.onlyLatest(true)
.saveTimeseries(false)
.saveLatest(true)
.callback(new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void tmp) {

View File

@ -118,10 +118,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
EntityId entityId = request.getEntityId();
checkInternalEntity(entityId);
boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null;
if (sysTenant || request.isOnlyLatest() || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
if (sysTenant || !request.isSaveTimeseries() || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
KvUtils.validate(request.getEntries(), valueNoXssValidation);
ListenableFuture<Integer> future = saveTimeseriesInternal(request);
if (!request.isOnlyLatest()) {
if (request.isSaveTimeseries()) {
FutureCallback<Integer> callback = getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant, request.getCallback());
Futures.addCallback(future, callback, tsCallBackExecutor);
}
@ -135,17 +135,21 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
TenantId tenantId = request.getTenantId();
EntityId entityId = request.getEntityId();
ListenableFuture<Integer> saveFuture;
if (request.isOnlyLatest()) {
saveFuture = Futures.transform(tsService.saveLatest(tenantId, entityId, request.getEntries()), result -> 0, MoreExecutors.directExecutor());
} else if (request.isSaveLatest()) {
if (request.isSaveTimeseries() && request.isSaveLatest()) {
saveFuture = tsService.save(tenantId, entityId, request.getEntries(), request.getTtl());
} else {
} else if (request.isSaveLatest()) {
saveFuture = Futures.transform(tsService.saveLatest(tenantId, entityId, request.getEntries()), result -> 0, MoreExecutors.directExecutor());
} else if (request.isSaveTimeseries()) {
saveFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl());
} else {
saveFuture = Futures.immediateFuture(0);
}
addMainCallback(saveFuture, request.getCallback());
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
if (request.isSaveLatest() && !request.isOnlyLatest()) {
if (request.isSendWsUpdate()) {
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
}
if (request.isSaveTimeseries() && request.isSaveLatest()) {
addEntityViewCallback(tenantId, entityId, request.getEntries());
}
return saveFuture;
@ -232,7 +236,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
.tenantId(tenantId)
.entityId(entityView.getId())
.entries(entityViewLatest)
.onlyLatest(true)
.saveTimeseries(false)
.saveLatest(true)
.callback(new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void tmp) {}

View File

@ -38,8 +38,9 @@ public class TimeseriesSaveRequest {
private final EntityId entityId;
private final List<TsKvEntry> entries;
private final long ttl;
private final boolean saveTimeseries;
private final boolean saveLatest;
private final boolean onlyLatest;
private final boolean sendWsUpdate;
private final FutureCallback<Void> callback;
public static Builder builder() {
@ -53,9 +54,10 @@ public class TimeseriesSaveRequest {
private EntityId entityId;
private List<TsKvEntry> entries;
private long ttl;
private FutureCallback<Void> callback;
private boolean saveTimeseries = true;
private boolean saveLatest = true;
private boolean onlyLatest;
private boolean sendWsUpdate = true;
private FutureCallback<Void> callback;
Builder() {}
@ -92,14 +94,18 @@ public class TimeseriesSaveRequest {
return this;
}
public Builder saveTimeseries(boolean saveTimeseries) {
this.saveTimeseries = saveTimeseries;
return this;
}
public Builder saveLatest(boolean saveLatest) {
this.saveLatest = saveLatest;
return this;
}
public Builder onlyLatest(boolean onlyLatest) {
this.onlyLatest = onlyLatest;
this.saveLatest = true;
public Builder sendWsUpdate(boolean sendWsUpdate) {
this.sendWsUpdate = sendWsUpdate;
return this;
}
@ -123,7 +129,7 @@ public class TimeseriesSaveRequest {
}
public TimeseriesSaveRequest build() {
return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, onlyLatest, callback);
return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveTimeseries, saveLatest, sendWsUpdate, callback);
}
}

View File

@ -37,8 +37,14 @@ import org.thingsboard.server.common.msg.TbMsg;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Deduplicate;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.WebSocketsOnly;
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST;
@Slf4j
@ -68,12 +74,15 @@ public class TbMsgTimeseriesNode implements TbNode {
private TbContext ctx;
private long tenantProfileDefaultStorageTtl;
private PersistenceSettings persistenceSettings;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbMsgTimeseriesNodeConfiguration.class);
this.ctx = ctx;
ctx.addTenantProfileListener(this::onTenantProfileUpdate);
onTenantProfileUpdate(ctx.getTenantProfile());
persistenceSettings = config.getPersistenceSettings();
}
void onTenantProfileUpdate(TenantProfile tenantProfile) {
@ -88,6 +97,18 @@ public class TbMsgTimeseriesNode implements TbNode {
return;
}
long ts = computeTs(msg, config.isUseServerTs());
PersistenceDecision persistenceDecision = makePersistenceDecision(ts, msg.getOriginator().getId());
boolean saveTimeseries = persistenceDecision.saveTimeseries();
boolean saveLatest = persistenceDecision.saveLatest();
boolean sendWsUpdate = persistenceDecision.sendWsUpdate();
// short-circuit
if (!saveTimeseries && !saveLatest && !sendWsUpdate) {
ctx.tellSuccess(msg);
return;
}
String src = msg.getData();
Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(src), ts);
if (tsKvMap.isEmpty()) {
@ -111,7 +132,9 @@ public class TbMsgTimeseriesNode implements TbNode {
.entityId(msg.getOriginator())
.entries(tsKvEntryList)
.ttl(ttl)
.saveLatest(!config.isSkipLatestPersistence())
.saveTimeseries(saveTimeseries)
.saveLatest(saveLatest)
.sendWsUpdate(sendWsUpdate)
.callback(new TelemetryNodeCallback(ctx, msg))
.build());
}
@ -120,6 +143,37 @@ public class TbMsgTimeseriesNode implements TbNode {
return ignoreMetadataTs ? System.currentTimeMillis() : msg.getMetaDataTs();
}
private record PersistenceDecision(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate) {}
private PersistenceDecision makePersistenceDecision(long ts, UUID originatorUuid) {
boolean saveTimeseries;
boolean saveLatest;
boolean sendWsUpdate;
if (persistenceSettings instanceof OnEveryMessage) {
saveTimeseries = true;
saveLatest = true;
sendWsUpdate = true;
} else if (persistenceSettings instanceof WebSocketsOnly) {
saveTimeseries = false;
saveLatest = false;
sendWsUpdate = true;
} else if (persistenceSettings instanceof Deduplicate deduplicate) {
boolean isFirstMsgInInterval = deduplicate.getDeduplicateStrategy().shouldPersist(ts, originatorUuid);
saveTimeseries = isFirstMsgInInterval;
saveLatest = isFirstMsgInInterval;
sendWsUpdate = isFirstMsgInInterval;
} else if (persistenceSettings instanceof Advanced advanced) {
saveTimeseries = advanced.timeseries().shouldPersist(ts, originatorUuid);
saveLatest = advanced.latest().shouldPersist(ts, originatorUuid);
sendWsUpdate = advanced.webSockets().shouldPersist(ts, originatorUuid);
} else { // should not happen
throw new IllegalArgumentException("Unknown persistence settings type: " + persistenceSettings.getClass().getSimpleName());
}
return new PersistenceDecision(saveTimeseries, saveLatest, sendWsUpdate);
}
@Override
public void destroy() {
ctx.removeListeners();

View File

@ -15,22 +15,77 @@
*/
package org.thingsboard.rule.engine.telemetry;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;
import lombok.Getter;
import org.thingsboard.rule.engine.api.NodeConfiguration;
import org.thingsboard.rule.engine.telemetry.strategy.PersistenceStrategy;
import java.util.Objects;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Deduplicate;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage;
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.WebSocketsOnly;
@Data
public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration<TbMsgTimeseriesNodeConfiguration> {
private long defaultTTL;
private boolean skipLatestPersistence;
private boolean useServerTs;
private PersistenceSettings persistenceSettings;
@Override
public TbMsgTimeseriesNodeConfiguration defaultConfiguration() {
TbMsgTimeseriesNodeConfiguration configuration = new TbMsgTimeseriesNodeConfiguration();
configuration.setDefaultTTL(0L);
configuration.setSkipLatestPersistence(false);
configuration.setUseServerTs(false);
configuration.setPersistenceSettings(new OnEveryMessage());
return configuration;
}
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type"
)
@JsonSubTypes({
@JsonSubTypes.Type(value = OnEveryMessage.class, name = "ON_EVERY_MESSAGE"),
@JsonSubTypes.Type(value = WebSocketsOnly.class, name = "WEBSOCKETS_ONLY"),
@JsonSubTypes.Type(value = Deduplicate.class, name = "DEDUPLICATE"),
@JsonSubTypes.Type(value = Advanced.class, name = "ADVANCED")
})
sealed interface PersistenceSettings permits OnEveryMessage, Deduplicate, WebSocketsOnly, Advanced {
record OnEveryMessage() implements PersistenceSettings {}
record WebSocketsOnly() implements PersistenceSettings {}
@Getter
final class Deduplicate implements PersistenceSettings {
public final PersistenceStrategy deduplicateStrategy;
@JsonCreator
private Deduplicate(@JsonProperty("deduplicationIntervalSecs") int deduplicationIntervalSecs) {
this.deduplicateStrategy = PersistenceStrategy.deduplicate(deduplicationIntervalSecs);
}
}
record Advanced(PersistenceStrategy timeseries, PersistenceStrategy latest, PersistenceStrategy webSockets) implements PersistenceSettings {
public Advanced {
Objects.requireNonNull(timeseries);
Objects.requireNonNull(latest);
Objects.requireNonNull(webSockets);
}
}
}
}

View File

@ -0,0 +1,54 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry.strategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.Sets;
import java.time.Duration;
import java.util.Set;
import java.util.UUID;
final class DeduplicatePersistenceStrategy implements PersistenceStrategy {
private static final int MIN_DEDUPLICATION_INTERVAL_SECS = 1;
private final long deduplicationIntervalMillis;
private final LoadingCache<Long, Set<UUID>> deduplicationCache;
@JsonCreator
public DeduplicatePersistenceStrategy(@JsonProperty("deduplicationIntervalSecs") int deduplicationIntervalSecs) {
if (deduplicationIntervalSecs < MIN_DEDUPLICATION_INTERVAL_SECS) {
throw new IllegalArgumentException("Deduplication interval must be at least " + MIN_DEDUPLICATION_INTERVAL_SECS + " second(s), was " + deduplicationIntervalSecs + " second(s)");
}
deduplicationIntervalMillis = Duration.ofSeconds(deduplicationIntervalSecs).toMillis();
deduplicationCache = Caffeine.newBuilder()
.softValues()
.expireAfterAccess(Duration.ofSeconds(deduplicationIntervalSecs * 10L))
.maximumSize(20L)
.build(__ -> Sets.newConcurrentHashSet());
}
@Override
public boolean shouldPersist(long ts, UUID originatorUuid) {
long intervalNumber = ts / deduplicationIntervalMillis;
return deduplicationCache.get(intervalNumber).add(originatorUuid);
}
}

View File

@ -0,0 +1,38 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry.strategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import java.util.UUID;
final class OnEveryMessagePersistenceStrategy implements PersistenceStrategy {
private static final OnEveryMessagePersistenceStrategy INSTANCE = new OnEveryMessagePersistenceStrategy();
private OnEveryMessagePersistenceStrategy() {}
@JsonCreator
public static OnEveryMessagePersistenceStrategy getInstance() {
return INSTANCE;
}
@Override
public boolean shouldPersist(long ts, UUID originatorUuid) {
return true;
}
}

View File

@ -0,0 +1,49 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry.strategy;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.util.UUID;
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type"
)
@JsonSubTypes({
@JsonSubTypes.Type(value = OnEveryMessagePersistenceStrategy.class, name = "ON_EVERY_MESSAGE"),
@JsonSubTypes.Type(value = DeduplicatePersistenceStrategy.class, name = "DEDUPLICATE"),
@JsonSubTypes.Type(value = SkipPersistenceStrategy.class, name = "SKIP")
})
public sealed interface PersistenceStrategy permits OnEveryMessagePersistenceStrategy, DeduplicatePersistenceStrategy, SkipPersistenceStrategy {
static PersistenceStrategy onEveryMessage() {
return OnEveryMessagePersistenceStrategy.getInstance();
}
static PersistenceStrategy deduplicate(int deduplicationIntervalSecs) {
return new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
}
static PersistenceStrategy skip() {
return SkipPersistenceStrategy.getInstance();
}
boolean shouldPersist(long ts, UUID originatorUuid);
}

View File

@ -0,0 +1,38 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry.strategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import java.util.UUID;
final class SkipPersistenceStrategy implements PersistenceStrategy {
private static final SkipPersistenceStrategy INSTANCE = new SkipPersistenceStrategy();
private SkipPersistenceStrategy() {}
@JsonCreator
public static SkipPersistenceStrategy getInstance() {
return INSTANCE;
}
@Override
public boolean shouldPersist(long ts, UUID originatorUuid) {
return false;
}
}

View File

@ -33,6 +33,7 @@ import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.rule.engine.telemetry.strategy.PersistenceStrategy;
import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.DeviceId;
@ -88,7 +89,7 @@ public class TbMsgTimeseriesNodeTest {
@Test
public void verifyDefaultConfig() {
assertThat(config.getDefaultTTL()).isEqualTo(0L);
assertThat(config.isSkipLatestPersistence()).isFalse();
assertThat(config.getPersistenceSettings()).isInstanceOf(TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage.class);
assertThat(config.isUseServerTs()).isFalse();
}
@ -162,7 +163,13 @@ public class TbMsgTimeseriesNodeTest {
public void givenSkipLatestPersistenceIsTrueAndTtlFromConfig_whenOnMsg_thenSaveTimeseriesUsingTtlFromConfig() throws TbNodeException {
long ttlFromConfig = 5L;
config.setDefaultTTL(ttlFromConfig);
config.setSkipLatestPersistence(true);
var timeseriesStrategy = PersistenceStrategy.onEveryMessage();
var latestStrategy = PersistenceStrategy.skip();
var webSockets = PersistenceStrategy.onEveryMessage();
var persistenceSettings = new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced(timeseriesStrategy, latestStrategy, webSockets);
config.setPersistenceSettings(persistenceSettings);
init();
String data = """
@ -197,7 +204,9 @@ public class TbMsgTimeseriesNodeTest {
assertThat(request.getEntityId()).isEqualTo(DEVICE_ID);
assertThat(request.getEntries()).containsExactlyElementsOf(expectedList);
assertThat(request.getTtl()).isEqualTo(ttlFromConfig);
assertThat(request.isSaveTimeseries()).isTrue();
assertThat(request.isSaveLatest()).isFalse();
assertThat(request.isSendWsUpdate()).isTrue();
assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class);
}));
verify(ctxMock).tellSuccess(msg);