From e5003df778701a7191a577fb784d97cfc0c47eb4 Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Mon, 30 Dec 2024 20:30:44 +0200 Subject: [PATCH] [WIP] save time series strategies: draft implementation for latest using Bloom filter --- .../DoNotSavePersistenceStrategy.java | 34 ++++++++++++ .../engine/telemetry/PersistenceStrategy.java | 35 ++++++++++++ .../SaveEveryMessagePersistenceStrategy.java | 34 ++++++++++++ ...aveFirstInIntervalPersistenceStrategy.java | 54 +++++++++++++++++++ .../engine/telemetry/TbMsgTimeseriesNode.java | 52 +++++++++++++++--- .../TbMsgTimeseriesNodeConfiguration.java | 12 ++++- .../telemetry/TbMsgTimeseriesNodeTest.java | 4 +- 7 files changed, 213 insertions(+), 12 deletions(-) create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/DoNotSavePersistenceStrategy.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/PersistenceStrategy.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/SaveEveryMessagePersistenceStrategy.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/SaveFirstInIntervalPersistenceStrategy.java diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/DoNotSavePersistenceStrategy.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/DoNotSavePersistenceStrategy.java new file mode 100644 index 0000000000..5652a34cd1 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/DoNotSavePersistenceStrategy.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry; + +import org.thingsboard.server.common.data.kv.TsKvEntry; + +import java.util.UUID; + +public final class DoNotSavePersistenceStrategy implements PersistenceStrategy { + + public static final DoNotSavePersistenceStrategy INSTANCE = new DoNotSavePersistenceStrategy(); + + private DoNotSavePersistenceStrategy() { + } + + @Override + public boolean shouldPersist(UUID originatorUuid, TsKvEntry timeseriesEntry) { + return false; + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/PersistenceStrategy.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/PersistenceStrategy.java new file mode 100644 index 0000000000..2aa92bd57c --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/PersistenceStrategy.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.thingsboard.server.common.data.kv.TsKvEntry; + +import java.util.UUID; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = SaveEveryMessagePersistenceStrategy.class, name = "SAVE_EVERY_MESSAGE"), + @JsonSubTypes.Type(value = SaveFirstInIntervalPersistenceStrategy.class, name = "SAVE_FIRST_IN_INTERVAL"), + @JsonSubTypes.Type(value = DoNotSavePersistenceStrategy.class, name = "DO_NOT_SAVE") +}) +public sealed interface PersistenceStrategy permits DoNotSavePersistenceStrategy, SaveEveryMessagePersistenceStrategy, SaveFirstInIntervalPersistenceStrategy { + + // TODO: maybe this should accept generic key? + boolean shouldPersist(UUID originatorUuid, TsKvEntry timeseriesEntry); + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/SaveEveryMessagePersistenceStrategy.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/SaveEveryMessagePersistenceStrategy.java new file mode 100644 index 0000000000..7ca564b0bc --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/SaveEveryMessagePersistenceStrategy.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry; + +import org.thingsboard.server.common.data.kv.TsKvEntry; + +import java.util.UUID; + +public final class SaveEveryMessagePersistenceStrategy implements PersistenceStrategy { + + public static final SaveEveryMessagePersistenceStrategy INSTANCE = new SaveEveryMessagePersistenceStrategy(); + + private SaveEveryMessagePersistenceStrategy() { + } + + @Override + public boolean shouldPersist(UUID originatorUuid, TsKvEntry timeseriesEntry) { + return true; + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/SaveFirstInIntervalPersistenceStrategy.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/SaveFirstInIntervalPersistenceStrategy.java new file mode 100644 index 0000000000..4677058e6a --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/SaveFirstInIntervalPersistenceStrategy.java @@ -0,0 +1,54 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.hash.BloomFilter; +import org.thingsboard.server.common.data.kv.TsKvEntry; + +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +@SuppressWarnings("UnstableApiUsage") +public final class SaveFirstInIntervalPersistenceStrategy implements PersistenceStrategy { + + private final long intervalDurationMillis; + private final BloomFilter filter; + + @JsonCreator + public SaveFirstInIntervalPersistenceStrategy(@JsonProperty("intervalDurationMillis") long intervalDurationMillis) { + this.intervalDurationMillis = intervalDurationMillis; + // TODO: implement funnel as an enum + filter = BloomFilter.create((key, sink) -> + sink.putLong(key.intervalNumber()) + .putLong(key.originatorUuid().getMostSignificantBits()) + .putLong(key.originatorUuid().getLeastSignificantBits()) + .putString(key.timeseriesKey(), StandardCharsets.UTF_8), 1_000_000); + } + + // TODO: this should not be hardcoded here but should be defined by clients + // should be generified (what to do with funnel then?) + private record Key(long intervalNumber, UUID originatorUuid, String timeseriesKey) { + } + + @Override + public boolean shouldPersist(UUID originatorUuid, TsKvEntry timeseriesEntry) { + long intervalNumber = timeseriesEntry.getTs() / intervalDurationMillis; + return filter.put(new Key(intervalNumber, originatorUuid, timeseriesEntry.getKey())); + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index 27f45feb47..28f9c88e34 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -15,8 +15,12 @@ */ package org.thingsboard.rule.engine.telemetry; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; @@ -27,6 +31,9 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; @@ -59,7 +66,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_RE "The DB layer has certain optimizations to ignore the updates of the \"attributes\" and \"latest values\" tables if the new record has a timestamp that is older than the previous record. " + "So, to make sure that all the messages will be processed correctly, one should enable this parameter for sequential message processing scenarios.", uiResources = {"static/rulenode/rulenode-core-config.js"}, - configDirective = "tbActionNodeTimeseriesConfig", + // configDirective = "tbActionNodeTimeseriesConfig", icon = "file_upload" ) public class TbMsgTimeseriesNode implements TbNode { @@ -68,10 +75,13 @@ public class TbMsgTimeseriesNode implements TbNode { private TbContext ctx; private long tenantProfileDefaultStorageTtl; + private PersistenceStrategy latestPersistenceStrategy; + @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbMsgTimeseriesNodeConfiguration.class); this.ctx = ctx; + latestPersistenceStrategy = config.getPersistenceConfig().latest(); ctx.addTenantProfileListener(this::onTenantProfileUpdate); onTenantProfileUpdate(ctx.getTenantProfile()); } @@ -94,10 +104,18 @@ public class TbMsgTimeseriesNode implements TbNode { ctx.tellFailure(msg, new IllegalArgumentException("Msg body is empty: " + src)); return; } - List tsKvEntryList = new ArrayList<>(); + List withLatest = new ArrayList<>(); + List withoutLatest = new ArrayList<>(); for (Map.Entry> tsKvEntry : tsKvMap.entrySet()) { for (KvEntry kvEntry : tsKvEntry.getValue()) { - tsKvEntryList.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry)); + TsKvEntry entry = new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry); + if (latestPersistenceStrategy.shouldPersist(msg.getOriginator().getId(), entry)) { + log.info("Persisting entry: {}", entry); + withLatest.add(entry); + } else { + log.info("Skipping entry: {}", entry); + withoutLatest.add(entry); + } } } String ttlValue = msg.getMetaData().getValue("TTL"); @@ -105,15 +123,33 @@ public class TbMsgTimeseriesNode implements TbNode { if (ttl == 0L) { ttl = tenantProfileDefaultStorageTtl; } - ctx.getTelemetryService().saveTimeseries(TimeseriesSaveRequest.builder() + + SettableFuture withLatestSavedFuture = SettableFuture.create(); + TimeseriesSaveRequest saveWithLatestRequest = TimeseriesSaveRequest.builder() .tenantId(ctx.getTenantId()) .customerId(msg.getCustomerId()) .entityId(msg.getOriginator()) - .entries(tsKvEntryList) + .entries(withLatest) .ttl(ttl) - .saveLatest(!config.isSkipLatestPersistence()) - .callback(new TelemetryNodeCallback(ctx, msg)) - .build()); + .saveLatest(true) + .future(withLatestSavedFuture) + .build(); + ctx.getTelemetryService().saveTimeseries(saveWithLatestRequest); + + SettableFuture withoutLatestSavedFuture = SettableFuture.create(); + TimeseriesSaveRequest saveWithoutLatestRequest = TimeseriesSaveRequest.builder() + .tenantId(ctx.getTenantId()) + .customerId(msg.getCustomerId()) + .entityId(msg.getOriginator()) + .entries(withoutLatest) + .ttl(ttl) + .saveLatest(false) + .future(withoutLatestSavedFuture) + .build(); + ctx.getTelemetryService().saveTimeseries(saveWithoutLatestRequest); + + ListenableFuture> bothSavedFuture = Futures.allAsList(withLatestSavedFuture, withoutLatestSavedFuture); + DonAsynchron.withCallback(bothSavedFuture, success -> ctx.tellSuccess(msg), failure -> ctx.tellFailure(msg, failure)); } public static long computeTs(TbMsg msg, boolean ignoreMetadataTs) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java index 1c33778a6b..adaebb0967 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.telemetry; +import lombok.Builder; import lombok.Data; import org.thingsboard.rule.engine.api.NodeConfiguration; @@ -22,15 +23,22 @@ import org.thingsboard.rule.engine.api.NodeConfiguration; public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration { private long defaultTTL; - private boolean skipLatestPersistence; private boolean useServerTs; + private PersistenceConfig persistenceConfig; @Override public TbMsgTimeseriesNodeConfiguration defaultConfiguration() { TbMsgTimeseriesNodeConfiguration configuration = new TbMsgTimeseriesNodeConfiguration(); configuration.setDefaultTTL(0L); - configuration.setSkipLatestPersistence(false); configuration.setUseServerTs(false); + configuration.setPersistenceConfig(PersistenceConfig.builder() + .latest(SaveEveryMessagePersistenceStrategy.INSTANCE) + .build()); return configuration; } + + @Builder + record PersistenceConfig(PersistenceStrategy latest) { + } + } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java index 2cba4b8fb3..74240d3f97 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java @@ -88,7 +88,7 @@ public class TbMsgTimeseriesNodeTest { @Test public void verifyDefaultConfig() { assertThat(config.getDefaultTTL()).isEqualTo(0L); - assertThat(config.isSkipLatestPersistence()).isFalse(); + // assertThat(config.isSkipLatestPersistence()).isFalse(); assertThat(config.isUseServerTs()).isFalse(); } @@ -162,7 +162,7 @@ public class TbMsgTimeseriesNodeTest { public void givenSkipLatestPersistenceIsTrueAndTtlFromConfig_whenOnMsg_thenSaveTimeseriesUsingTtlFromConfig() throws TbNodeException { long ttlFromConfig = 5L; config.setDefaultTTL(ttlFromConfig); - config.setSkipLatestPersistence(true); + // config.setSkipLatestPersistence(true); init(); String data = """