[WIP] save time series strategies: draft implementation for latest using Bloom filter
This commit is contained in:
		
							parent
							
								
									ace60d9008
								
							
						
					
					
						commit
						e5003df778
					
				@ -0,0 +1,34 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2024 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.telemetry;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
public final class DoNotSavePersistenceStrategy implements PersistenceStrategy {
 | 
			
		||||
 | 
			
		||||
    public static final DoNotSavePersistenceStrategy INSTANCE = new DoNotSavePersistenceStrategy();
 | 
			
		||||
 | 
			
		||||
    private DoNotSavePersistenceStrategy() {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean shouldPersist(UUID originatorUuid, TsKvEntry timeseriesEntry) {
 | 
			
		||||
        return false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,35 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2024 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.telemetry;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
 | 
			
		||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
 | 
			
		||||
@JsonSubTypes({
 | 
			
		||||
        @JsonSubTypes.Type(value = SaveEveryMessagePersistenceStrategy.class, name = "SAVE_EVERY_MESSAGE"),
 | 
			
		||||
        @JsonSubTypes.Type(value = SaveFirstInIntervalPersistenceStrategy.class, name = "SAVE_FIRST_IN_INTERVAL"),
 | 
			
		||||
        @JsonSubTypes.Type(value = DoNotSavePersistenceStrategy.class, name = "DO_NOT_SAVE")
 | 
			
		||||
})
 | 
			
		||||
public sealed interface PersistenceStrategy permits DoNotSavePersistenceStrategy, SaveEveryMessagePersistenceStrategy, SaveFirstInIntervalPersistenceStrategy {
 | 
			
		||||
 | 
			
		||||
    // TODO: maybe this should accept generic key?
 | 
			
		||||
    boolean shouldPersist(UUID originatorUuid, TsKvEntry timeseriesEntry);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,34 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2024 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.telemetry;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
public final class SaveEveryMessagePersistenceStrategy implements PersistenceStrategy {
 | 
			
		||||
 | 
			
		||||
    public static final SaveEveryMessagePersistenceStrategy INSTANCE = new SaveEveryMessagePersistenceStrategy();
 | 
			
		||||
 | 
			
		||||
    private SaveEveryMessagePersistenceStrategy() {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean shouldPersist(UUID originatorUuid, TsKvEntry timeseriesEntry) {
 | 
			
		||||
        return true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,54 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2024 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.telemetry;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.annotation.JsonCreator;
 | 
			
		||||
import com.fasterxml.jackson.annotation.JsonProperty;
 | 
			
		||||
import com.google.common.hash.BloomFilter;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
 | 
			
		||||
import java.nio.charset.StandardCharsets;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
@SuppressWarnings("UnstableApiUsage")
 | 
			
		||||
public final class SaveFirstInIntervalPersistenceStrategy implements PersistenceStrategy {
 | 
			
		||||
 | 
			
		||||
    private final long intervalDurationMillis;
 | 
			
		||||
    private final BloomFilter<Key> filter;
 | 
			
		||||
 | 
			
		||||
    @JsonCreator
 | 
			
		||||
    public SaveFirstInIntervalPersistenceStrategy(@JsonProperty("intervalDurationMillis") long intervalDurationMillis) {
 | 
			
		||||
        this.intervalDurationMillis = intervalDurationMillis;
 | 
			
		||||
        // TODO: implement funnel as an enum
 | 
			
		||||
        filter = BloomFilter.create((key, sink) ->
 | 
			
		||||
                sink.putLong(key.intervalNumber())
 | 
			
		||||
                        .putLong(key.originatorUuid().getMostSignificantBits())
 | 
			
		||||
                        .putLong(key.originatorUuid().getLeastSignificantBits())
 | 
			
		||||
                        .putString(key.timeseriesKey(), StandardCharsets.UTF_8), 1_000_000);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // TODO: this should not be hardcoded here but should be defined by clients
 | 
			
		||||
    //       should be generified (what to do with funnel then?)
 | 
			
		||||
    private record Key(long intervalNumber, UUID originatorUuid, String timeseriesKey) {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean shouldPersist(UUID originatorUuid, TsKvEntry timeseriesEntry) {
 | 
			
		||||
        long intervalNumber = timeseriesEntry.getTs() / intervalDurationMillis;
 | 
			
		||||
        return filter.put(new Key(intervalNumber, originatorUuid, timeseriesEntry.getKey()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -15,8 +15,12 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.telemetry;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.SettableFuture;
 | 
			
		||||
import com.google.gson.JsonParser;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.common.util.DonAsynchron;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbContext;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
@ -27,6 +31,9 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.server.common.adaptor.JsonConverter;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.TenantProfile;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CustomerId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.KvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
@ -59,7 +66,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_RE
 | 
			
		||||
                "The DB layer has certain optimizations to ignore the updates of the \"attributes\" and \"latest values\" tables if the new record has a timestamp that is older than the previous record. " +
 | 
			
		||||
                "So, to make sure that all the messages will be processed correctly, one should enable this parameter for sequential message processing scenarios.",
 | 
			
		||||
        uiResources = {"static/rulenode/rulenode-core-config.js"},
 | 
			
		||||
        configDirective = "tbActionNodeTimeseriesConfig",
 | 
			
		||||
        // configDirective = "tbActionNodeTimeseriesConfig",
 | 
			
		||||
        icon = "file_upload"
 | 
			
		||||
)
 | 
			
		||||
public class TbMsgTimeseriesNode implements TbNode {
 | 
			
		||||
@ -68,10 +75,13 @@ public class TbMsgTimeseriesNode implements TbNode {
 | 
			
		||||
    private TbContext ctx;
 | 
			
		||||
    private long tenantProfileDefaultStorageTtl;
 | 
			
		||||
 | 
			
		||||
    private PersistenceStrategy latestPersistenceStrategy;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, TbMsgTimeseriesNodeConfiguration.class);
 | 
			
		||||
        this.ctx = ctx;
 | 
			
		||||
        latestPersistenceStrategy = config.getPersistenceConfig().latest();
 | 
			
		||||
        ctx.addTenantProfileListener(this::onTenantProfileUpdate);
 | 
			
		||||
        onTenantProfileUpdate(ctx.getTenantProfile());
 | 
			
		||||
    }
 | 
			
		||||
@ -94,10 +104,18 @@ public class TbMsgTimeseriesNode implements TbNode {
 | 
			
		||||
            ctx.tellFailure(msg, new IllegalArgumentException("Msg body is empty: " + src));
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        List<TsKvEntry> tsKvEntryList = new ArrayList<>();
 | 
			
		||||
        List<TsKvEntry> withLatest = new ArrayList<>();
 | 
			
		||||
        List<TsKvEntry> withoutLatest = new ArrayList<>();
 | 
			
		||||
        for (Map.Entry<Long, List<KvEntry>> tsKvEntry : tsKvMap.entrySet()) {
 | 
			
		||||
            for (KvEntry kvEntry : tsKvEntry.getValue()) {
 | 
			
		||||
                tsKvEntryList.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry));
 | 
			
		||||
                TsKvEntry entry = new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry);
 | 
			
		||||
                if (latestPersistenceStrategy.shouldPersist(msg.getOriginator().getId(), entry)) {
 | 
			
		||||
                    log.info("Persisting entry: {}", entry);
 | 
			
		||||
                    withLatest.add(entry);
 | 
			
		||||
                } else {
 | 
			
		||||
                    log.info("Skipping entry: {}", entry);
 | 
			
		||||
                    withoutLatest.add(entry);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        String ttlValue = msg.getMetaData().getValue("TTL");
 | 
			
		||||
@ -105,15 +123,33 @@ public class TbMsgTimeseriesNode implements TbNode {
 | 
			
		||||
        if (ttl == 0L) {
 | 
			
		||||
            ttl = tenantProfileDefaultStorageTtl;
 | 
			
		||||
        }
 | 
			
		||||
        ctx.getTelemetryService().saveTimeseries(TimeseriesSaveRequest.builder()
 | 
			
		||||
 | 
			
		||||
        SettableFuture<Void> withLatestSavedFuture = SettableFuture.create();
 | 
			
		||||
        TimeseriesSaveRequest saveWithLatestRequest = TimeseriesSaveRequest.builder()
 | 
			
		||||
                .tenantId(ctx.getTenantId())
 | 
			
		||||
                .customerId(msg.getCustomerId())
 | 
			
		||||
                .entityId(msg.getOriginator())
 | 
			
		||||
                .entries(tsKvEntryList)
 | 
			
		||||
                .entries(withLatest)
 | 
			
		||||
                .ttl(ttl)
 | 
			
		||||
                .saveLatest(!config.isSkipLatestPersistence())
 | 
			
		||||
                .callback(new TelemetryNodeCallback(ctx, msg))
 | 
			
		||||
                .build());
 | 
			
		||||
                .saveLatest(true)
 | 
			
		||||
                .future(withLatestSavedFuture)
 | 
			
		||||
                .build();
 | 
			
		||||
        ctx.getTelemetryService().saveTimeseries(saveWithLatestRequest);
 | 
			
		||||
 | 
			
		||||
        SettableFuture<Void> withoutLatestSavedFuture = SettableFuture.create();
 | 
			
		||||
        TimeseriesSaveRequest saveWithoutLatestRequest = TimeseriesSaveRequest.builder()
 | 
			
		||||
                .tenantId(ctx.getTenantId())
 | 
			
		||||
                .customerId(msg.getCustomerId())
 | 
			
		||||
                .entityId(msg.getOriginator())
 | 
			
		||||
                .entries(withoutLatest)
 | 
			
		||||
                .ttl(ttl)
 | 
			
		||||
                .saveLatest(false)
 | 
			
		||||
                .future(withoutLatestSavedFuture)
 | 
			
		||||
                .build();
 | 
			
		||||
        ctx.getTelemetryService().saveTimeseries(saveWithoutLatestRequest);
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<List<Void>> bothSavedFuture = Futures.allAsList(withLatestSavedFuture, withoutLatestSavedFuture);
 | 
			
		||||
        DonAsynchron.withCallback(bothSavedFuture, success -> ctx.tellSuccess(msg), failure -> ctx.tellFailure(msg, failure));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static long computeTs(TbMsg msg, boolean ignoreMetadataTs) {
 | 
			
		||||
 | 
			
		||||
@ -15,6 +15,7 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.telemetry;
 | 
			
		||||
 | 
			
		||||
import lombok.Builder;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.rule.engine.api.NodeConfiguration;
 | 
			
		||||
 | 
			
		||||
@ -22,15 +23,22 @@ import org.thingsboard.rule.engine.api.NodeConfiguration;
 | 
			
		||||
public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration<TbMsgTimeseriesNodeConfiguration> {
 | 
			
		||||
 | 
			
		||||
    private long defaultTTL;
 | 
			
		||||
    private boolean skipLatestPersistence;
 | 
			
		||||
    private boolean useServerTs;
 | 
			
		||||
    private PersistenceConfig persistenceConfig;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbMsgTimeseriesNodeConfiguration defaultConfiguration() {
 | 
			
		||||
        TbMsgTimeseriesNodeConfiguration configuration = new TbMsgTimeseriesNodeConfiguration();
 | 
			
		||||
        configuration.setDefaultTTL(0L);
 | 
			
		||||
        configuration.setSkipLatestPersistence(false);
 | 
			
		||||
        configuration.setUseServerTs(false);
 | 
			
		||||
        configuration.setPersistenceConfig(PersistenceConfig.builder()
 | 
			
		||||
                .latest(SaveEveryMessagePersistenceStrategy.INSTANCE)
 | 
			
		||||
                .build());
 | 
			
		||||
        return configuration;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Builder
 | 
			
		||||
    record PersistenceConfig(PersistenceStrategy latest) {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -88,7 +88,7 @@ public class TbMsgTimeseriesNodeTest {
 | 
			
		||||
    @Test
 | 
			
		||||
    public void verifyDefaultConfig() {
 | 
			
		||||
        assertThat(config.getDefaultTTL()).isEqualTo(0L);
 | 
			
		||||
        assertThat(config.isSkipLatestPersistence()).isFalse();
 | 
			
		||||
        // assertThat(config.isSkipLatestPersistence()).isFalse();
 | 
			
		||||
        assertThat(config.isUseServerTs()).isFalse();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -162,7 +162,7 @@ public class TbMsgTimeseriesNodeTest {
 | 
			
		||||
    public void givenSkipLatestPersistenceIsTrueAndTtlFromConfig_whenOnMsg_thenSaveTimeseriesUsingTtlFromConfig() throws TbNodeException {
 | 
			
		||||
        long ttlFromConfig = 5L;
 | 
			
		||||
        config.setDefaultTTL(ttlFromConfig);
 | 
			
		||||
        config.setSkipLatestPersistence(true);
 | 
			
		||||
        // config.setSkipLatestPersistence(true);
 | 
			
		||||
        init();
 | 
			
		||||
 | 
			
		||||
        String data = """
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user