diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java index 686e85cb1e..8e0a901eed 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java @@ -348,7 +348,8 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen .tenantId(entityView.getTenantId()) .entityId(entityId) .entries(latestValues) - .onlyLatest(true) + .saveTimeseries(false) + .saveLatest(true) .callback(new FutureCallback() { @Override public void onSuccess(@Nullable Void tmp) { diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index d4444d5d7f..d7ff5fa5b7 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -118,10 +118,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer EntityId entityId = request.getEntityId(); checkInternalEntity(entityId); boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null; - if (sysTenant || request.isOnlyLatest() || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) { + if (sysTenant || !request.isSaveTimeseries() || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) { KvUtils.validate(request.getEntries(), valueNoXssValidation); ListenableFuture future = saveTimeseriesInternal(request); - if (!request.isOnlyLatest()) { + if (request.isSaveTimeseries()) { FutureCallback callback = getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant, request.getCallback()); Futures.addCallback(future, callback, tsCallBackExecutor); } @@ -135,17 +135,21 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer TenantId tenantId = request.getTenantId(); EntityId entityId = request.getEntityId(); ListenableFuture saveFuture; - if (request.isOnlyLatest()) { - saveFuture = Futures.transform(tsService.saveLatest(tenantId, entityId, request.getEntries()), result -> 0, MoreExecutors.directExecutor()); - } else if (request.isSaveLatest()) { + if (request.isSaveTimeseries() && request.isSaveLatest()) { saveFuture = tsService.save(tenantId, entityId, request.getEntries(), request.getTtl()); - } else { + } else if (request.isSaveLatest()) { + saveFuture = Futures.transform(tsService.saveLatest(tenantId, entityId, request.getEntries()), result -> 0, MoreExecutors.directExecutor()); + } else if (request.isSaveTimeseries()) { saveFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl()); + } else { + saveFuture = Futures.immediateFuture(0); } addMainCallback(saveFuture, request.getCallback()); - addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); - if (request.isSaveLatest() && !request.isOnlyLatest()) { + if (request.isSendWsUpdate()) { + addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); + } + if (request.isSaveTimeseries() && request.isSaveLatest()) { addEntityViewCallback(tenantId, entityId, request.getEntries()); } return saveFuture; @@ -232,7 +236,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer .tenantId(tenantId) .entityId(entityView.getId()) .entries(entityViewLatest) - .onlyLatest(true) + .saveTimeseries(false) + .saveLatest(true) .callback(new FutureCallback<>() { @Override public void onSuccess(@Nullable Void tmp) {} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java index 2b5881212d..1ad0fd3408 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java @@ -38,8 +38,9 @@ public class TimeseriesSaveRequest { private final EntityId entityId; private final List entries; private final long ttl; + private final boolean saveTimeseries; private final boolean saveLatest; - private final boolean onlyLatest; + private final boolean sendWsUpdate; private final FutureCallback callback; public static Builder builder() { @@ -53,9 +54,10 @@ public class TimeseriesSaveRequest { private EntityId entityId; private List entries; private long ttl; - private FutureCallback callback; + private boolean saveTimeseries = true; private boolean saveLatest = true; - private boolean onlyLatest; + private boolean sendWsUpdate = true; + private FutureCallback callback; Builder() {} @@ -92,14 +94,18 @@ public class TimeseriesSaveRequest { return this; } + public Builder saveTimeseries(boolean saveTimeseries) { + this.saveTimeseries = saveTimeseries; + return this; + } + public Builder saveLatest(boolean saveLatest) { this.saveLatest = saveLatest; return this; } - public Builder onlyLatest(boolean onlyLatest) { - this.onlyLatest = onlyLatest; - this.saveLatest = true; + public Builder sendWsUpdate(boolean sendWsUpdate) { + this.sendWsUpdate = sendWsUpdate; return this; } @@ -123,7 +129,7 @@ public class TimeseriesSaveRequest { } public TimeseriesSaveRequest build() { - return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, onlyLatest, callback); + return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveTimeseries, saveLatest, sendWsUpdate, callback); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index 27f45feb47..b3eea21ece 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -37,8 +37,14 @@ import org.thingsboard.server.common.msg.TbMsg; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings; +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced; +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Deduplicate; +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage; +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.WebSocketsOnly; import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST; @Slf4j @@ -68,12 +74,15 @@ public class TbMsgTimeseriesNode implements TbNode { private TbContext ctx; private long tenantProfileDefaultStorageTtl; + private PersistenceSettings persistenceSettings; + @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbMsgTimeseriesNodeConfiguration.class); this.ctx = ctx; ctx.addTenantProfileListener(this::onTenantProfileUpdate); onTenantProfileUpdate(ctx.getTenantProfile()); + persistenceSettings = config.getPersistenceSettings(); } void onTenantProfileUpdate(TenantProfile tenantProfile) { @@ -88,6 +97,18 @@ public class TbMsgTimeseriesNode implements TbNode { return; } long ts = computeTs(msg, config.isUseServerTs()); + + PersistenceDecision persistenceDecision = makePersistenceDecision(ts, msg.getOriginator().getId()); + boolean saveTimeseries = persistenceDecision.saveTimeseries(); + boolean saveLatest = persistenceDecision.saveLatest(); + boolean sendWsUpdate = persistenceDecision.sendWsUpdate(); + + // short-circuit + if (!saveTimeseries && !saveLatest && !sendWsUpdate) { + ctx.tellSuccess(msg); + return; + } + String src = msg.getData(); Map> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(src), ts); if (tsKvMap.isEmpty()) { @@ -111,7 +132,9 @@ public class TbMsgTimeseriesNode implements TbNode { .entityId(msg.getOriginator()) .entries(tsKvEntryList) .ttl(ttl) - .saveLatest(!config.isSkipLatestPersistence()) + .saveTimeseries(saveTimeseries) + .saveLatest(saveLatest) + .sendWsUpdate(sendWsUpdate) .callback(new TelemetryNodeCallback(ctx, msg)) .build()); } @@ -120,6 +143,37 @@ public class TbMsgTimeseriesNode implements TbNode { return ignoreMetadataTs ? System.currentTimeMillis() : msg.getMetaDataTs(); } + private record PersistenceDecision(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate) {} + + private PersistenceDecision makePersistenceDecision(long ts, UUID originatorUuid) { + boolean saveTimeseries; + boolean saveLatest; + boolean sendWsUpdate; + + if (persistenceSettings instanceof OnEveryMessage) { + saveTimeseries = true; + saveLatest = true; + sendWsUpdate = true; + } else if (persistenceSettings instanceof WebSocketsOnly) { + saveTimeseries = false; + saveLatest = false; + sendWsUpdate = true; + } else if (persistenceSettings instanceof Deduplicate deduplicate) { + boolean isFirstMsgInInterval = deduplicate.getDeduplicateStrategy().shouldPersist(ts, originatorUuid); + saveTimeseries = isFirstMsgInInterval; + saveLatest = isFirstMsgInInterval; + sendWsUpdate = isFirstMsgInInterval; + } else if (persistenceSettings instanceof Advanced advanced) { + saveTimeseries = advanced.timeseries().shouldPersist(ts, originatorUuid); + saveLatest = advanced.latest().shouldPersist(ts, originatorUuid); + sendWsUpdate = advanced.webSockets().shouldPersist(ts, originatorUuid); + } else { // should not happen + throw new IllegalArgumentException("Unknown persistence settings type: " + persistenceSettings.getClass().getSimpleName()); + } + + return new PersistenceDecision(saveTimeseries, saveLatest, sendWsUpdate); + } + @Override public void destroy() { ctx.removeListeners(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java index 1c33778a6b..8be8cac24e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java @@ -15,22 +15,77 @@ */ package org.thingsboard.rule.engine.telemetry; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import lombok.Data; +import lombok.Getter; import org.thingsboard.rule.engine.api.NodeConfiguration; +import org.thingsboard.rule.engine.telemetry.strategy.PersistenceStrategy; + +import java.util.Objects; + +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced; +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Deduplicate; +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage; +import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.WebSocketsOnly; @Data public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration { private long defaultTTL; - private boolean skipLatestPersistence; private boolean useServerTs; + private PersistenceSettings persistenceSettings; @Override public TbMsgTimeseriesNodeConfiguration defaultConfiguration() { TbMsgTimeseriesNodeConfiguration configuration = new TbMsgTimeseriesNodeConfiguration(); configuration.setDefaultTTL(0L); - configuration.setSkipLatestPersistence(false); configuration.setUseServerTs(false); + configuration.setPersistenceSettings(new OnEveryMessage()); return configuration; } + + @JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "type" + ) + @JsonSubTypes({ + @JsonSubTypes.Type(value = OnEveryMessage.class, name = "ON_EVERY_MESSAGE"), + @JsonSubTypes.Type(value = WebSocketsOnly.class, name = "WEBSOCKETS_ONLY"), + @JsonSubTypes.Type(value = Deduplicate.class, name = "DEDUPLICATE"), + @JsonSubTypes.Type(value = Advanced.class, name = "ADVANCED") + }) + sealed interface PersistenceSettings permits OnEveryMessage, Deduplicate, WebSocketsOnly, Advanced { + + record OnEveryMessage() implements PersistenceSettings {} + + record WebSocketsOnly() implements PersistenceSettings {} + + @Getter + final class Deduplicate implements PersistenceSettings { + + public final PersistenceStrategy deduplicateStrategy; + + @JsonCreator + private Deduplicate(@JsonProperty("deduplicationIntervalSecs") int deduplicationIntervalSecs) { + this.deduplicateStrategy = PersistenceStrategy.deduplicate(deduplicationIntervalSecs); + } + + } + + record Advanced(PersistenceStrategy timeseries, PersistenceStrategy latest, PersistenceStrategy webSockets) implements PersistenceSettings { + + public Advanced { + Objects.requireNonNull(timeseries); + Objects.requireNonNull(latest); + Objects.requireNonNull(webSockets); + } + + } + + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategy.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategy.java new file mode 100644 index 0000000000..05333ce035 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/DeduplicatePersistenceStrategy.java @@ -0,0 +1,54 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry.strategy; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.collect.Sets; + +import java.time.Duration; +import java.util.Set; +import java.util.UUID; + +final class DeduplicatePersistenceStrategy implements PersistenceStrategy { + + private static final int MIN_DEDUPLICATION_INTERVAL_SECS = 1; + + private final long deduplicationIntervalMillis; + private final LoadingCache> deduplicationCache; + + @JsonCreator + public DeduplicatePersistenceStrategy(@JsonProperty("deduplicationIntervalSecs") int deduplicationIntervalSecs) { + if (deduplicationIntervalSecs < MIN_DEDUPLICATION_INTERVAL_SECS) { + throw new IllegalArgumentException("Deduplication interval must be at least " + MIN_DEDUPLICATION_INTERVAL_SECS + " second(s), was " + deduplicationIntervalSecs + " second(s)"); + } + deduplicationIntervalMillis = Duration.ofSeconds(deduplicationIntervalSecs).toMillis(); + deduplicationCache = Caffeine.newBuilder() + .softValues() + .expireAfterAccess(Duration.ofSeconds(deduplicationIntervalSecs * 10L)) + .maximumSize(20L) + .build(__ -> Sets.newConcurrentHashSet()); + } + + @Override + public boolean shouldPersist(long ts, UUID originatorUuid) { + long intervalNumber = ts / deduplicationIntervalMillis; + return deduplicationCache.get(intervalNumber).add(originatorUuid); + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/OnEveryMessagePersistenceStrategy.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/OnEveryMessagePersistenceStrategy.java new file mode 100644 index 0000000000..4fcb74dc33 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/OnEveryMessagePersistenceStrategy.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry.strategy; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.UUID; + +final class OnEveryMessagePersistenceStrategy implements PersistenceStrategy { + + private static final OnEveryMessagePersistenceStrategy INSTANCE = new OnEveryMessagePersistenceStrategy(); + + private OnEveryMessagePersistenceStrategy() {} + + @JsonCreator + public static OnEveryMessagePersistenceStrategy getInstance() { + return INSTANCE; + } + + @Override + public boolean shouldPersist(long ts, UUID originatorUuid) { + return true; + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/PersistenceStrategy.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/PersistenceStrategy.java new file mode 100644 index 0000000000..453092117b --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/PersistenceStrategy.java @@ -0,0 +1,49 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry.strategy; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import java.util.UUID; + +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "type" +) +@JsonSubTypes({ + @JsonSubTypes.Type(value = OnEveryMessagePersistenceStrategy.class, name = "ON_EVERY_MESSAGE"), + @JsonSubTypes.Type(value = DeduplicatePersistenceStrategy.class, name = "DEDUPLICATE"), + @JsonSubTypes.Type(value = SkipPersistenceStrategy.class, name = "SKIP") +}) +public sealed interface PersistenceStrategy permits OnEveryMessagePersistenceStrategy, DeduplicatePersistenceStrategy, SkipPersistenceStrategy { + + static PersistenceStrategy onEveryMessage() { + return OnEveryMessagePersistenceStrategy.getInstance(); + } + + static PersistenceStrategy deduplicate(int deduplicationIntervalSecs) { + return new DeduplicatePersistenceStrategy(deduplicationIntervalSecs); + } + + static PersistenceStrategy skip() { + return SkipPersistenceStrategy.getInstance(); + } + + boolean shouldPersist(long ts, UUID originatorUuid); + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/SkipPersistenceStrategy.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/SkipPersistenceStrategy.java new file mode 100644 index 0000000000..c3d96e8ca7 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/strategy/SkipPersistenceStrategy.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry.strategy; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.UUID; + +final class SkipPersistenceStrategy implements PersistenceStrategy { + + private static final SkipPersistenceStrategy INSTANCE = new SkipPersistenceStrategy(); + + private SkipPersistenceStrategy() {} + + @JsonCreator + public static SkipPersistenceStrategy getInstance() { + return INSTANCE; + } + + @Override + public boolean shouldPersist(long ts, UUID originatorUuid) { + return false; + } + +} diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java index 2cba4b8fb3..ba8c614944 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java @@ -33,6 +33,7 @@ import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.rule.engine.telemetry.strategy.PersistenceStrategy; import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.DeviceId; @@ -88,7 +89,7 @@ public class TbMsgTimeseriesNodeTest { @Test public void verifyDefaultConfig() { assertThat(config.getDefaultTTL()).isEqualTo(0L); - assertThat(config.isSkipLatestPersistence()).isFalse(); + assertThat(config.getPersistenceSettings()).isInstanceOf(TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage.class); assertThat(config.isUseServerTs()).isFalse(); } @@ -162,7 +163,13 @@ public class TbMsgTimeseriesNodeTest { public void givenSkipLatestPersistenceIsTrueAndTtlFromConfig_whenOnMsg_thenSaveTimeseriesUsingTtlFromConfig() throws TbNodeException { long ttlFromConfig = 5L; config.setDefaultTTL(ttlFromConfig); - config.setSkipLatestPersistence(true); + + var timeseriesStrategy = PersistenceStrategy.onEveryMessage(); + var latestStrategy = PersistenceStrategy.skip(); + var webSockets = PersistenceStrategy.onEveryMessage(); + var persistenceSettings = new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced(timeseriesStrategy, latestStrategy, webSockets); + config.setPersistenceSettings(persistenceSettings); + init(); String data = """ @@ -197,7 +204,9 @@ public class TbMsgTimeseriesNodeTest { assertThat(request.getEntityId()).isEqualTo(DEVICE_ID); assertThat(request.getEntries()).containsExactlyElementsOf(expectedList); assertThat(request.getTtl()).isEqualTo(ttlFromConfig); + assertThat(request.isSaveTimeseries()).isTrue(); assertThat(request.isSaveLatest()).isFalse(); + assertThat(request.isSendWsUpdate()).isTrue(); assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class); })); verify(ctxMock).tellSuccess(msg);