Change wording from persistence to processing on backend
This commit is contained in:
		
							parent
							
								
									8472e4f74e
								
							
						
					
					
						commit
						12a8c070a6
					
				@ -37,7 +37,7 @@
 | 
			
		||||
        "configuration": {
 | 
			
		||||
          "defaultTTL": 0,
 | 
			
		||||
          "useServerTs": false,
 | 
			
		||||
          "persistenceSettings": {
 | 
			
		||||
          "processingSettings": {
 | 
			
		||||
            "type": "ON_EVERY_MESSAGE"
 | 
			
		||||
          }
 | 
			
		||||
        },
 | 
			
		||||
 | 
			
		||||
@ -23,7 +23,7 @@
 | 
			
		||||
        "configuration": {
 | 
			
		||||
          "defaultTTL": 0,
 | 
			
		||||
          "useServerTs": false,
 | 
			
		||||
          "persistenceSettings": {
 | 
			
		||||
          "processingSettings": {
 | 
			
		||||
            "type": "ON_EVERY_MESSAGE"
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -22,7 +22,7 @@
 | 
			
		||||
        "configuration": {
 | 
			
		||||
          "defaultTTL": 0,
 | 
			
		||||
          "useServerTs": false,
 | 
			
		||||
          "persistenceSettings": {
 | 
			
		||||
          "processingSettings": {
 | 
			
		||||
            "type": "ON_EVERY_MESSAGE"
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -225,7 +225,7 @@ DO $$
 | 
			
		||||
            SET configuration = (
 | 
			
		||||
                (configuration::jsonb - 'skipLatestPersistence')
 | 
			
		||||
                    || jsonb_build_object(
 | 
			
		||||
                        'persistenceSettings', jsonb_build_object(
 | 
			
		||||
                        'processingSettings', jsonb_build_object(
 | 
			
		||||
                                'type',       'ADVANCED',
 | 
			
		||||
                                'timeseries', jsonb_build_object('type', 'ON_EVERY_MESSAGE'),
 | 
			
		||||
                                'latest',     jsonb_build_object('type', 'SKIP'),
 | 
			
		||||
@ -242,7 +242,7 @@ DO $$
 | 
			
		||||
            SET configuration = (
 | 
			
		||||
                (configuration::jsonb - 'skipLatestPersistence')
 | 
			
		||||
                    || jsonb_build_object(
 | 
			
		||||
                        'persistenceSettings', jsonb_build_object(
 | 
			
		||||
                        'processingSettings', jsonb_build_object(
 | 
			
		||||
                                'type', 'ON_EVERY_MESSAGE'
 | 
			
		||||
                                               )
 | 
			
		||||
                       )
 | 
			
		||||
 | 
			
		||||
@ -25,7 +25,7 @@
 | 
			
		||||
        "configuration": {
 | 
			
		||||
          "defaultTTL": 0,
 | 
			
		||||
          "useServerTs": false,
 | 
			
		||||
          "persistenceSettings": {
 | 
			
		||||
          "processingSettings": {
 | 
			
		||||
            "type": "ON_EVERY_MESSAGE"
 | 
			
		||||
          }
 | 
			
		||||
        },
 | 
			
		||||
@ -281,7 +281,7 @@
 | 
			
		||||
        "configuration": {
 | 
			
		||||
          "defaultTTL": 0,
 | 
			
		||||
          "useServerTs": false,
 | 
			
		||||
          "persistenceSettings": {
 | 
			
		||||
          "processingSettings": {
 | 
			
		||||
            "type": "ON_EVERY_MESSAGE"
 | 
			
		||||
          }
 | 
			
		||||
        },
 | 
			
		||||
@ -319,7 +319,7 @@
 | 
			
		||||
        "configuration": {
 | 
			
		||||
          "defaultTTL": 180,
 | 
			
		||||
          "useServerTs": false,
 | 
			
		||||
          "persistenceSettings": {
 | 
			
		||||
          "processingSettings": {
 | 
			
		||||
            "type": "ON_EVERY_MESSAGE"
 | 
			
		||||
          }
 | 
			
		||||
        },
 | 
			
		||||
 | 
			
		||||
@ -40,7 +40,7 @@
 | 
			
		||||
      "configuration": {
 | 
			
		||||
        "defaultTTL": 0,
 | 
			
		||||
        "useServerTs": false,
 | 
			
		||||
        "persistenceSettings": {
 | 
			
		||||
        "processingSettings": {
 | 
			
		||||
          "type": "ON_EVERY_MESSAGE"
 | 
			
		||||
        }
 | 
			
		||||
      },
 | 
			
		||||
 | 
			
		||||
@ -27,7 +27,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.telemetry.strategy.PersistenceStrategy;
 | 
			
		||||
import org.thingsboard.rule.engine.telemetry.strategy.ProcessingStrategy;
 | 
			
		||||
import org.thingsboard.server.common.adaptor.JsonConverter;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.TenantProfile;
 | 
			
		||||
@ -45,11 +45,11 @@ import java.util.Map;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings;
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced;
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Deduplicate;
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage;
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.WebSocketsOnly;
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.ProcessingSettings;
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced;
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Deduplicate;
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.ProcessingSettings.OnEveryMessage;
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.ProcessingSettings.WebSocketsOnly;
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
@ -58,7 +58,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_RE
 | 
			
		||||
        name = "save time series",
 | 
			
		||||
        configClazz = TbMsgTimeseriesNodeConfiguration.class,
 | 
			
		||||
        nodeDescription = """
 | 
			
		||||
                Saves time series data with a configurable TTL and according to configured persistence strategies.
 | 
			
		||||
                Saves time series data with a configurable TTL and according to configured processing strategies.
 | 
			
		||||
                """,
 | 
			
		||||
        nodeDetails = """
 | 
			
		||||
                Node performs three <strong>actions:</strong>
 | 
			
		||||
@ -68,14 +68,14 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_RE
 | 
			
		||||
                  <li><strong>WebSockets:</strong> notify WebSockets subscriptions about time series data updates.</li>
 | 
			
		||||
                </ul>
 | 
			
		||||
                
 | 
			
		||||
                For each <em>action</em>, three <strong>persistence strategies</strong> are available:
 | 
			
		||||
                For each <em>action</em>, three <strong>processing strategies</strong> are available:
 | 
			
		||||
                <ul>
 | 
			
		||||
                  <li><strong>On every message:</strong> perform the action for every message.</li>
 | 
			
		||||
                  <li><strong>Deduplicate:</strong> perform the action only for the first message from a particular originator within a configurable interval.</li>
 | 
			
		||||
                  <li><strong>Skip:</strong> never perform the action.</li>
 | 
			
		||||
                </ul>
 | 
			
		||||
                
 | 
			
		||||
                <strong>Persistence strategies</strong> are configured using <em>persistence settings</em>, which support two modes:
 | 
			
		||||
                <strong>Processing strategies</strong> are configured using <em>processing settings</em>, which support two modes:
 | 
			
		||||
                <ul>
 | 
			
		||||
                  <li><strong>Basic</strong>
 | 
			
		||||
                    <ul>
 | 
			
		||||
@ -110,7 +110,7 @@ public class TbMsgTimeseriesNode implements TbNode {
 | 
			
		||||
    private TbContext ctx;
 | 
			
		||||
    private long tenantProfileDefaultStorageTtl;
 | 
			
		||||
 | 
			
		||||
    private PersistenceSettings persistenceSettings;
 | 
			
		||||
    private ProcessingSettings processingSettings;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
@ -118,7 +118,7 @@ public class TbMsgTimeseriesNode implements TbNode {
 | 
			
		||||
        this.ctx = ctx;
 | 
			
		||||
        ctx.addTenantProfileListener(this::onTenantProfileUpdate);
 | 
			
		||||
        onTenantProfileUpdate(ctx.getTenantProfile());
 | 
			
		||||
        persistenceSettings = config.getPersistenceSettings();
 | 
			
		||||
        processingSettings = config.getProcessingSettings();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void onTenantProfileUpdate(TenantProfile tenantProfile) {
 | 
			
		||||
@ -175,25 +175,25 @@ public class TbMsgTimeseriesNode implements TbNode {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TimeseriesSaveRequest.Strategy determineSaveStrategy(long ts, UUID originatorUuid) {
 | 
			
		||||
        if (persistenceSettings instanceof OnEveryMessage) {
 | 
			
		||||
        if (processingSettings instanceof OnEveryMessage) {
 | 
			
		||||
            return TimeseriesSaveRequest.Strategy.SAVE_ALL;
 | 
			
		||||
        }
 | 
			
		||||
        if (persistenceSettings instanceof WebSocketsOnly) {
 | 
			
		||||
        if (processingSettings instanceof WebSocketsOnly) {
 | 
			
		||||
            return TimeseriesSaveRequest.Strategy.WS_ONLY;
 | 
			
		||||
        }
 | 
			
		||||
        if (persistenceSettings instanceof Deduplicate deduplicate) {
 | 
			
		||||
            boolean isFirstMsgInInterval = deduplicate.getPersistenceStrategy().shouldPersist(ts, originatorUuid);
 | 
			
		||||
        if (processingSettings instanceof Deduplicate deduplicate) {
 | 
			
		||||
            boolean isFirstMsgInInterval = deduplicate.getProcessingStrategy().shouldProcess(ts, originatorUuid);
 | 
			
		||||
            return isFirstMsgInInterval ? TimeseriesSaveRequest.Strategy.SAVE_ALL : TimeseriesSaveRequest.Strategy.SKIP_ALL;
 | 
			
		||||
        }
 | 
			
		||||
        if (persistenceSettings instanceof Advanced advanced) {
 | 
			
		||||
        if (processingSettings instanceof Advanced advanced) {
 | 
			
		||||
            return new TimeseriesSaveRequest.Strategy(
 | 
			
		||||
                    advanced.timeseries().shouldPersist(ts, originatorUuid),
 | 
			
		||||
                    advanced.latest().shouldPersist(ts, originatorUuid),
 | 
			
		||||
                    advanced.webSockets().shouldPersist(ts, originatorUuid)
 | 
			
		||||
                    advanced.timeseries().shouldProcess(ts, originatorUuid),
 | 
			
		||||
                    advanced.latest().shouldProcess(ts, originatorUuid),
 | 
			
		||||
                    advanced.webSockets().shouldProcess(ts, originatorUuid)
 | 
			
		||||
            );
 | 
			
		||||
        }
 | 
			
		||||
        // should not happen
 | 
			
		||||
        throw new IllegalArgumentException("Unknown persistence settings type: " + persistenceSettings.getClass().getSimpleName());
 | 
			
		||||
        throw new IllegalArgumentException("Unknown processing settings type: " + processingSettings.getClass().getSimpleName());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -209,14 +209,14 @@ public class TbMsgTimeseriesNode implements TbNode {
 | 
			
		||||
                hasChanges = true;
 | 
			
		||||
                JsonNode skipLatestPersistence = oldConfiguration.get("skipLatestPersistence");
 | 
			
		||||
                if (skipLatestPersistence != null && "true".equals(skipLatestPersistence.asText())) {
 | 
			
		||||
                    var skipLatestPersistenceSettings = new Advanced(
 | 
			
		||||
                            PersistenceStrategy.onEveryMessage(),
 | 
			
		||||
                            PersistenceStrategy.skip(),
 | 
			
		||||
                            PersistenceStrategy.onEveryMessage()
 | 
			
		||||
                    var skipLatestProcessingSettings = new Advanced(
 | 
			
		||||
                            ProcessingStrategy.onEveryMessage(),
 | 
			
		||||
                            ProcessingStrategy.skip(),
 | 
			
		||||
                            ProcessingStrategy.onEveryMessage()
 | 
			
		||||
                    );
 | 
			
		||||
                    ((ObjectNode) oldConfiguration).set("persistenceSettings", JacksonUtil.valueToTree(skipLatestPersistenceSettings));
 | 
			
		||||
                    ((ObjectNode) oldConfiguration).set("processingSettings", JacksonUtil.valueToTree(skipLatestProcessingSettings));
 | 
			
		||||
                } else {
 | 
			
		||||
                    ((ObjectNode) oldConfiguration).set("persistenceSettings", JacksonUtil.valueToTree(new OnEveryMessage()));
 | 
			
		||||
                    ((ObjectNode) oldConfiguration).set("processingSettings", JacksonUtil.valueToTree(new OnEveryMessage()));
 | 
			
		||||
                }
 | 
			
		||||
                ((ObjectNode) oldConfiguration).remove("skipLatestPersistence");
 | 
			
		||||
                break;
 | 
			
		||||
 | 
			
		||||
@ -24,14 +24,14 @@ import jakarta.validation.constraints.NotNull;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import org.thingsboard.rule.engine.api.NodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.telemetry.strategy.PersistenceStrategy;
 | 
			
		||||
import org.thingsboard.rule.engine.telemetry.strategy.ProcessingStrategy;
 | 
			
		||||
 | 
			
		||||
import java.util.Objects;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced;
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Deduplicate;
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage;
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.PersistenceSettings.WebSocketsOnly;
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced;
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Deduplicate;
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.ProcessingSettings.OnEveryMessage;
 | 
			
		||||
import static org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration.ProcessingSettings.WebSocketsOnly;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration<TbMsgTimeseriesNodeConfiguration> {
 | 
			
		||||
@ -39,14 +39,14 @@ public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration<TbMsg
 | 
			
		||||
    private long defaultTTL;
 | 
			
		||||
    private boolean useServerTs;
 | 
			
		||||
    @NotNull
 | 
			
		||||
    private PersistenceSettings persistenceSettings;
 | 
			
		||||
    private TbMsgTimeseriesNodeConfiguration.ProcessingSettings processingSettings;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbMsgTimeseriesNodeConfiguration defaultConfiguration() {
 | 
			
		||||
        TbMsgTimeseriesNodeConfiguration configuration = new TbMsgTimeseriesNodeConfiguration();
 | 
			
		||||
        configuration.setDefaultTTL(0L);
 | 
			
		||||
        configuration.setUseServerTs(false);
 | 
			
		||||
        configuration.setPersistenceSettings(new OnEveryMessage());
 | 
			
		||||
        configuration.setProcessingSettings(new OnEveryMessage());
 | 
			
		||||
        return configuration;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -61,29 +61,29 @@ public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration<TbMsg
 | 
			
		||||
            @JsonSubTypes.Type(value = Deduplicate.class, name = "DEDUPLICATE"),
 | 
			
		||||
            @JsonSubTypes.Type(value = Advanced.class, name = "ADVANCED")
 | 
			
		||||
    })
 | 
			
		||||
    sealed interface PersistenceSettings permits OnEveryMessage, Deduplicate, WebSocketsOnly, Advanced {
 | 
			
		||||
    sealed interface ProcessingSettings permits OnEveryMessage, Deduplicate, WebSocketsOnly, Advanced {
 | 
			
		||||
 | 
			
		||||
        record OnEveryMessage() implements PersistenceSettings {}
 | 
			
		||||
        record OnEveryMessage() implements ProcessingSettings {}
 | 
			
		||||
 | 
			
		||||
        record WebSocketsOnly() implements PersistenceSettings {}
 | 
			
		||||
        record WebSocketsOnly() implements ProcessingSettings {}
 | 
			
		||||
 | 
			
		||||
        @Getter
 | 
			
		||||
        final class Deduplicate implements PersistenceSettings {
 | 
			
		||||
        final class Deduplicate implements ProcessingSettings {
 | 
			
		||||
 | 
			
		||||
            private final int deduplicationIntervalSecs;
 | 
			
		||||
 | 
			
		||||
            @JsonIgnore
 | 
			
		||||
            private final PersistenceStrategy persistenceStrategy;
 | 
			
		||||
            private final ProcessingStrategy processingStrategy;
 | 
			
		||||
 | 
			
		||||
            @JsonCreator
 | 
			
		||||
            Deduplicate(@JsonProperty("deduplicationIntervalSecs") int deduplicationIntervalSecs) {
 | 
			
		||||
                this.deduplicationIntervalSecs = deduplicationIntervalSecs;
 | 
			
		||||
                persistenceStrategy = PersistenceStrategy.deduplicate(deduplicationIntervalSecs);
 | 
			
		||||
                processingStrategy = ProcessingStrategy.deduplicate(deduplicationIntervalSecs);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        record Advanced(PersistenceStrategy timeseries, PersistenceStrategy latest, PersistenceStrategy webSockets) implements PersistenceSettings {
 | 
			
		||||
        record Advanced(ProcessingStrategy timeseries, ProcessingStrategy latest, ProcessingStrategy webSockets) implements ProcessingSettings {
 | 
			
		||||
 | 
			
		||||
            public Advanced {
 | 
			
		||||
                Objects.requireNonNull(timeseries);
 | 
			
		||||
 | 
			
		||||
@ -26,7 +26,7 @@ import java.time.Duration;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
final class DeduplicatePersistenceStrategy implements PersistenceStrategy {
 | 
			
		||||
final class DeduplicateProcessingStrategy implements ProcessingStrategy {
 | 
			
		||||
 | 
			
		||||
    private static final int MIN_DEDUPLICATION_INTERVAL_SECS = 1;
 | 
			
		||||
    private static final int MAX_DEDUPLICATION_INTERVAL_SECS = (int) Duration.ofDays(1L).toSeconds();
 | 
			
		||||
@ -42,7 +42,7 @@ final class DeduplicatePersistenceStrategy implements PersistenceStrategy {
 | 
			
		||||
    private final LoadingCache<Long, Set<UUID>> deduplicationCache;
 | 
			
		||||
 | 
			
		||||
    @JsonCreator
 | 
			
		||||
    public DeduplicatePersistenceStrategy(@JsonProperty("deduplicationIntervalSecs") int deduplicationIntervalSecs) {
 | 
			
		||||
    public DeduplicateProcessingStrategy(@JsonProperty("deduplicationIntervalSecs") int deduplicationIntervalSecs) {
 | 
			
		||||
        if (deduplicationIntervalSecs < MIN_DEDUPLICATION_INTERVAL_SECS || deduplicationIntervalSecs > MAX_DEDUPLICATION_INTERVAL_SECS) {
 | 
			
		||||
            throw new IllegalArgumentException("Deduplication interval must be at least " + MIN_DEDUPLICATION_INTERVAL_SECS + " second(s) " +
 | 
			
		||||
                    "and at most " + MAX_DEDUPLICATION_INTERVAL_SECS + " second(s), was " + deduplicationIntervalSecs + " second(s)");
 | 
			
		||||
@ -81,7 +81,7 @@ final class DeduplicatePersistenceStrategy implements PersistenceStrategy {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean shouldPersist(long ts, UUID originatorUuid) {
 | 
			
		||||
    public boolean shouldProcess(long ts, UUID originatorUuid) {
 | 
			
		||||
        long intervalNumber = ts / deduplicationIntervalMillis;
 | 
			
		||||
        return deduplicationCache.get(intervalNumber).add(originatorUuid);
 | 
			
		||||
    }
 | 
			
		||||
@ -19,19 +19,19 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 | 
			
		||||
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
final class OnEveryMessagePersistenceStrategy implements PersistenceStrategy {
 | 
			
		||||
final class OnEveryMessageProcessingStrategy implements ProcessingStrategy {
 | 
			
		||||
 | 
			
		||||
    private static final OnEveryMessagePersistenceStrategy INSTANCE = new OnEveryMessagePersistenceStrategy();
 | 
			
		||||
    private static final OnEveryMessageProcessingStrategy INSTANCE = new OnEveryMessageProcessingStrategy();
 | 
			
		||||
 | 
			
		||||
    private OnEveryMessagePersistenceStrategy() {}
 | 
			
		||||
    private OnEveryMessageProcessingStrategy() {}
 | 
			
		||||
 | 
			
		||||
    @JsonCreator
 | 
			
		||||
    public static OnEveryMessagePersistenceStrategy getInstance() {
 | 
			
		||||
    public static OnEveryMessageProcessingStrategy getInstance() {
 | 
			
		||||
        return INSTANCE;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean shouldPersist(long ts, UUID originatorUuid) {
 | 
			
		||||
    public boolean shouldProcess(long ts, UUID originatorUuid) {
 | 
			
		||||
        return true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -26,24 +26,24 @@ import java.util.UUID;
 | 
			
		||||
        property = "type"
 | 
			
		||||
)
 | 
			
		||||
@JsonSubTypes({
 | 
			
		||||
        @JsonSubTypes.Type(value = OnEveryMessagePersistenceStrategy.class, name = "ON_EVERY_MESSAGE"),
 | 
			
		||||
        @JsonSubTypes.Type(value = DeduplicatePersistenceStrategy.class, name = "DEDUPLICATE"),
 | 
			
		||||
        @JsonSubTypes.Type(value = SkipPersistenceStrategy.class, name = "SKIP")
 | 
			
		||||
        @JsonSubTypes.Type(value = OnEveryMessageProcessingStrategy.class, name = "ON_EVERY_MESSAGE"),
 | 
			
		||||
        @JsonSubTypes.Type(value = DeduplicateProcessingStrategy.class, name = "DEDUPLICATE"),
 | 
			
		||||
        @JsonSubTypes.Type(value = SkipProcessingStrategy.class, name = "SKIP")
 | 
			
		||||
})
 | 
			
		||||
public sealed interface PersistenceStrategy permits OnEveryMessagePersistenceStrategy, DeduplicatePersistenceStrategy, SkipPersistenceStrategy {
 | 
			
		||||
public sealed interface ProcessingStrategy permits OnEveryMessageProcessingStrategy, DeduplicateProcessingStrategy, SkipProcessingStrategy {
 | 
			
		||||
 | 
			
		||||
    static PersistenceStrategy onEveryMessage() {
 | 
			
		||||
        return OnEveryMessagePersistenceStrategy.getInstance();
 | 
			
		||||
    static ProcessingStrategy onEveryMessage() {
 | 
			
		||||
        return OnEveryMessageProcessingStrategy.getInstance();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    static PersistenceStrategy deduplicate(int deduplicationIntervalSecs) {
 | 
			
		||||
        return new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
 | 
			
		||||
    static ProcessingStrategy deduplicate(int deduplicationIntervalSecs) {
 | 
			
		||||
        return new DeduplicateProcessingStrategy(deduplicationIntervalSecs);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    static PersistenceStrategy skip() {
 | 
			
		||||
        return SkipPersistenceStrategy.getInstance();
 | 
			
		||||
    static ProcessingStrategy skip() {
 | 
			
		||||
        return SkipProcessingStrategy.getInstance();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    boolean shouldPersist(long ts, UUID originatorUuid);
 | 
			
		||||
    boolean shouldProcess(long ts, UUID originatorUuid);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -19,19 +19,19 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 | 
			
		||||
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
final class SkipPersistenceStrategy implements PersistenceStrategy {
 | 
			
		||||
final class SkipProcessingStrategy implements ProcessingStrategy {
 | 
			
		||||
 | 
			
		||||
    private static final SkipPersistenceStrategy INSTANCE = new SkipPersistenceStrategy();
 | 
			
		||||
    private static final SkipProcessingStrategy INSTANCE = new SkipProcessingStrategy();
 | 
			
		||||
 | 
			
		||||
    private SkipPersistenceStrategy() {}
 | 
			
		||||
    private SkipProcessingStrategy() {}
 | 
			
		||||
 | 
			
		||||
    @JsonCreator
 | 
			
		||||
    public static SkipPersistenceStrategy getInstance() {
 | 
			
		||||
    public static SkipProcessingStrategy getInstance() {
 | 
			
		||||
        return INSTANCE;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean shouldPersist(long ts, UUID originatorUuid) {
 | 
			
		||||
    public boolean shouldProcess(long ts, UUID originatorUuid) {
 | 
			
		||||
        return false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -34,7 +34,7 @@ import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.telemetry.strategy.PersistenceStrategy;
 | 
			
		||||
import org.thingsboard.rule.engine.telemetry.strategy.ProcessingStrategy;
 | 
			
		||||
import org.thingsboard.server.common.adaptor.JsonConverter;
 | 
			
		||||
import org.thingsboard.server.common.data.TenantProfile;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
@ -110,7 +110,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
			
		||||
    @Test
 | 
			
		||||
    public void verifyDefaultConfig() {
 | 
			
		||||
        assertThat(config.getDefaultTTL()).isEqualTo(0L);
 | 
			
		||||
        assertThat(config.getPersistenceSettings()).isInstanceOf(TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage.class);
 | 
			
		||||
        assertThat(config.getProcessingSettings()).isInstanceOf(TbMsgTimeseriesNodeConfiguration.ProcessingSettings.OnEveryMessage.class);
 | 
			
		||||
        assertThat(config.isUseServerTs()).isFalse();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -124,14 +124,14 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenPersistenceSettingsAreNull_whenValidatingConstraints_thenThrowsException() {
 | 
			
		||||
    public void givenProcessingSettingsAreNull_whenValidatingConstraints_thenThrowsException() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setPersistenceSettings(null);
 | 
			
		||||
        config.setProcessingSettings(null);
 | 
			
		||||
 | 
			
		||||
        // WHEN-THEN
 | 
			
		||||
        assertThatThrownBy(() -> ConstraintValidator.validateFields(config))
 | 
			
		||||
                .isInstanceOf(DataValidationException.class)
 | 
			
		||||
                .hasMessage("Validation error: persistenceSettings must not be null");
 | 
			
		||||
                .hasMessage("Validation error: processingSettings must not be null");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @ParameterizedTest
 | 
			
		||||
@ -216,15 +216,15 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenSkipLatestPersistenceSettingsAndTtlFromConfig_whenOnMsg_thenSaveTimeseriesUsingTtlFromConfig() throws TbNodeException {
 | 
			
		||||
    public void givenSkipLatestProcessingSettingsAndTtlFromConfig_whenOnMsg_thenSaveTimeseriesUsingTtlFromConfig() throws TbNodeException {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setDefaultTTL(10L);
 | 
			
		||||
 | 
			
		||||
        var timeseriesStrategy = PersistenceStrategy.onEveryMessage();
 | 
			
		||||
        var latestStrategy = PersistenceStrategy.skip();
 | 
			
		||||
        var webSockets = PersistenceStrategy.onEveryMessage();
 | 
			
		||||
        var persistenceSettings = new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced(timeseriesStrategy, latestStrategy, webSockets);
 | 
			
		||||
        config.setPersistenceSettings(persistenceSettings);
 | 
			
		||||
        var timeseriesStrategy = ProcessingStrategy.onEveryMessage();
 | 
			
		||||
        var latestStrategy = ProcessingStrategy.skip();
 | 
			
		||||
        var webSockets = ProcessingStrategy.onEveryMessage();
 | 
			
		||||
        var processingSettings = new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced(timeseriesStrategy, latestStrategy, webSockets);
 | 
			
		||||
        config.setProcessingSettings(processingSettings);
 | 
			
		||||
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
 | 
			
		||||
@ -333,9 +333,9 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenOnEveryMessagePersistenceSettingsAndSameMessageTwoTimes_whenOnMsg_thenPersistSameMessageTwoTimes() throws TbNodeException {
 | 
			
		||||
    public void givenOnEveryMessageProcessingSettingsAndSameMessageTwoTimes_whenOnMsg_thenPersistSameMessageTwoTimes() throws TbNodeException {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.OnEveryMessage());
 | 
			
		||||
        config.setProcessingSettings(new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.OnEveryMessage());
 | 
			
		||||
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
 | 
			
		||||
@ -368,9 +368,9 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenDeduplicatePersistenceSettingsAndSameMessageTwoTimes_whenOnMsg_thenPersistThisMessageOnlyFirstTime() throws TbNodeException {
 | 
			
		||||
    public void givenDeduplicateProcessingSettingsAndSameMessageTwoTimes_whenOnMsg_thenPersistThisMessageOnlyFirstTime() throws TbNodeException {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Deduplicate(10));
 | 
			
		||||
        config.setProcessingSettings(new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Deduplicate(10));
 | 
			
		||||
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
 | 
			
		||||
@ -403,9 +403,9 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenWebsocketsOnlyPersistenceSettingsAndSameMessageTwoTimes_whenOnMsg_thenSendsOnlyWsUpdateTwoTimes() throws TbNodeException {
 | 
			
		||||
    public void givenWebSocketsOnlyProcessingSettingsAndSameMessageTwoTimes_whenOnMsg_thenSendsOnlyWsUpdateTwoTimes() throws TbNodeException {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.WebSocketsOnly());
 | 
			
		||||
        config.setProcessingSettings(new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.WebSocketsOnly());
 | 
			
		||||
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
 | 
			
		||||
@ -438,12 +438,12 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenAdvancedPersistenceSettingsWithOnEveryMessageStrategiesForAllActionsAndSameMessageTwoTimes_whenOnMsg_thenPersistSameMessageTwoTimes() throws TbNodeException {
 | 
			
		||||
    public void givenAdvancedProcessingSettingsWithOnEveryMessageStrategiesForAllActionsAndSameMessageTwoTimes_whenOnMsg_thenPersistSameMessageTwoTimes() throws TbNodeException {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced(
 | 
			
		||||
                PersistenceStrategy.onEveryMessage(),
 | 
			
		||||
                PersistenceStrategy.onEveryMessage(),
 | 
			
		||||
                PersistenceStrategy.onEveryMessage()
 | 
			
		||||
        config.setProcessingSettings(new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced(
 | 
			
		||||
                ProcessingStrategy.onEveryMessage(),
 | 
			
		||||
                ProcessingStrategy.onEveryMessage(),
 | 
			
		||||
                ProcessingStrategy.onEveryMessage()
 | 
			
		||||
        ));
 | 
			
		||||
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
@ -477,12 +477,12 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenAdvancedPersistenceSettingsWithDifferentDeduplicateStrategyForEachAction_whenOnMsg_thenEvaluatesStrategiesForEachActionsIndependently() throws TbNodeException {
 | 
			
		||||
    public void givenAdvancedProcessingSettingsWithDifferentDeduplicateStrategyForEachAction_whenOnMsg_thenEvaluatesStrategiesForEachActionsIndependently() throws TbNodeException {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced(
 | 
			
		||||
                PersistenceStrategy.deduplicate(1),
 | 
			
		||||
                PersistenceStrategy.deduplicate(2),
 | 
			
		||||
                PersistenceStrategy.deduplicate(3)
 | 
			
		||||
        config.setProcessingSettings(new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced(
 | 
			
		||||
                ProcessingStrategy.deduplicate(1),
 | 
			
		||||
                ProcessingStrategy.deduplicate(2),
 | 
			
		||||
                ProcessingStrategy.deduplicate(3)
 | 
			
		||||
        ));
 | 
			
		||||
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
@ -528,12 +528,12 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenAdvancedPersistenceSettingsWithSkipStrategiesForAllActionsAndSameMessageTwoTimes_whenOnMsg_thenSkipsSameMessageTwoTimes() throws TbNodeException {
 | 
			
		||||
    public void givenAdvancedProcessingSettingsWithSkipStrategiesForAllActionsAndSameMessageTwoTimes_whenOnMsg_thenSkipsSameMessageTwoTimes() throws TbNodeException {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        config.setPersistenceSettings(new TbMsgTimeseriesNodeConfiguration.PersistenceSettings.Advanced(
 | 
			
		||||
                PersistenceStrategy.skip(),
 | 
			
		||||
                PersistenceStrategy.skip(),
 | 
			
		||||
                PersistenceStrategy.skip()
 | 
			
		||||
        config.setProcessingSettings(new TbMsgTimeseriesNodeConfiguration.ProcessingSettings.Advanced(
 | 
			
		||||
                ProcessingStrategy.skip(),
 | 
			
		||||
                ProcessingStrategy.skip(),
 | 
			
		||||
                ProcessingStrategy.skip()
 | 
			
		||||
        ));
 | 
			
		||||
 | 
			
		||||
        node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
@ -577,7 +577,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
			
		||||
                                {
 | 
			
		||||
                                    "defaultTTL": 0,
 | 
			
		||||
                                    "useServerTs": false,
 | 
			
		||||
                                    "persistenceSettings": {
 | 
			
		||||
                                    "processingSettings": {
 | 
			
		||||
                                        "type": "ON_EVERY_MESSAGE"
 | 
			
		||||
                                    }
 | 
			
		||||
                                }"""),
 | 
			
		||||
@ -591,7 +591,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
			
		||||
                                {
 | 
			
		||||
                                    "defaultTTL": 0,
 | 
			
		||||
                                    "useServerTs": false,
 | 
			
		||||
                                    "persistenceSettings": {
 | 
			
		||||
                                    "processingSettings": {
 | 
			
		||||
                                        "type": "ON_EVERY_MESSAGE"
 | 
			
		||||
                                    }
 | 
			
		||||
                                }"""),
 | 
			
		||||
@ -606,7 +606,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
			
		||||
                                {
 | 
			
		||||
                                    "defaultTTL": 0,
 | 
			
		||||
                                    "useServerTs": false,
 | 
			
		||||
                                    "persistenceSettings": {
 | 
			
		||||
                                    "processingSettings": {
 | 
			
		||||
                                        "type": "ON_EVERY_MESSAGE"
 | 
			
		||||
                                    }
 | 
			
		||||
                                }"""),
 | 
			
		||||
@ -621,7 +621,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
			
		||||
                                {
 | 
			
		||||
                                    "defaultTTL": 0,
 | 
			
		||||
                                    "useServerTs": false,
 | 
			
		||||
                                    "persistenceSettings": {
 | 
			
		||||
                                    "processingSettings": {
 | 
			
		||||
                                        "type": "ADVANCED",
 | 
			
		||||
                                        "timeseries": {
 | 
			
		||||
                                            "type": "ON_EVERY_MESSAGE"
 | 
			
		||||
 | 
			
		||||
@ -28,27 +28,27 @@ import java.util.UUID;
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
 | 
			
		||||
 | 
			
		||||
class DeduplicatePersistenceStrategyTest {
 | 
			
		||||
class DeduplicateProcessingStrategyTest {
 | 
			
		||||
 | 
			
		||||
    final int deduplicationIntervalSecs = 10;
 | 
			
		||||
 | 
			
		||||
    DeduplicatePersistenceStrategy strategy;
 | 
			
		||||
    DeduplicateProcessingStrategy strategy;
 | 
			
		||||
 | 
			
		||||
    @BeforeEach
 | 
			
		||||
    void setup() {
 | 
			
		||||
        strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
 | 
			
		||||
        strategy = new DeduplicateProcessingStrategy(deduplicationIntervalSecs);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void shouldThrowWhenDeduplicationIntervalIsLessThanOneSecond() {
 | 
			
		||||
        assertThatThrownBy(() -> new DeduplicatePersistenceStrategy(0))
 | 
			
		||||
        assertThatThrownBy(() -> new DeduplicateProcessingStrategy(0))
 | 
			
		||||
                .isInstanceOf(IllegalArgumentException.class)
 | 
			
		||||
                .hasMessageContaining("Deduplication interval must be at least 1 second(s) and at most 86400 second(s), was 0 second(s)");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void shouldThrowWhenDeduplicationIntervalIsMoreThan24Hours() {
 | 
			
		||||
        assertThatThrownBy(() -> new DeduplicatePersistenceStrategy(86401))
 | 
			
		||||
        assertThatThrownBy(() -> new DeduplicateProcessingStrategy(86401))
 | 
			
		||||
                .isInstanceOf(IllegalArgumentException.class)
 | 
			
		||||
                .hasMessageContaining("Deduplication interval must be at least 1 second(s) and at most 86400 second(s), was 86401 second(s)");
 | 
			
		||||
    }
 | 
			
		||||
@ -59,7 +59,7 @@ class DeduplicatePersistenceStrategyTest {
 | 
			
		||||
        int deduplicationIntervalSecs = 1; // min deduplication interval duration
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
 | 
			
		||||
        strategy = new DeduplicateProcessingStrategy(deduplicationIntervalSecs);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
 | 
			
		||||
@ -76,7 +76,7 @@ class DeduplicatePersistenceStrategyTest {
 | 
			
		||||
        int deduplicationIntervalSecs = (int) Duration.ofHours(1L).toSeconds(); // max deduplication interval duration
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
 | 
			
		||||
        strategy = new DeduplicateProcessingStrategy(deduplicationIntervalSecs);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
 | 
			
		||||
@ -93,7 +93,7 @@ class DeduplicatePersistenceStrategyTest {
 | 
			
		||||
        int deduplicationIntervalSecs = (int) Duration.ofDays(1L).toSeconds(); // max deduplication interval duration
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
 | 
			
		||||
        strategy = new DeduplicateProcessingStrategy(deduplicationIntervalSecs);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
 | 
			
		||||
@ -110,7 +110,7 @@ class DeduplicatePersistenceStrategyTest {
 | 
			
		||||
        int deduplicationIntervalSecs = 1; // min deduplication interval duration
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
 | 
			
		||||
        strategy = new DeduplicateProcessingStrategy(deduplicationIntervalSecs);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
 | 
			
		||||
@ -127,7 +127,7 @@ class DeduplicatePersistenceStrategyTest {
 | 
			
		||||
        int deduplicationIntervalSecs = (int) Duration.ofHours(1L).toSeconds();
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
 | 
			
		||||
        strategy = new DeduplicateProcessingStrategy(deduplicationIntervalSecs);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
 | 
			
		||||
@ -144,7 +144,7 @@ class DeduplicatePersistenceStrategyTest {
 | 
			
		||||
        int deduplicationIntervalSecs = (int) Duration.ofDays(1L).toSeconds(); // max deduplication interval duration
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
 | 
			
		||||
        strategy = new DeduplicateProcessingStrategy(deduplicationIntervalSecs);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
 | 
			
		||||
@ -160,7 +160,7 @@ class DeduplicatePersistenceStrategyTest {
 | 
			
		||||
        long ts = 1_000_000L;
 | 
			
		||||
        UUID originator = UUID.randomUUID();
 | 
			
		||||
 | 
			
		||||
        assertThat(strategy.shouldPersist(ts, originator)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldProcess(ts, originator)).isTrue();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
@ -169,11 +169,11 @@ class DeduplicatePersistenceStrategyTest {
 | 
			
		||||
        UUID originator = UUID.randomUUID();
 | 
			
		||||
 | 
			
		||||
        // Initial call should return true
 | 
			
		||||
        assertThat(strategy.shouldPersist(baseTs, originator)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldProcess(baseTs, originator)).isTrue();
 | 
			
		||||
 | 
			
		||||
        // Subsequent call within the same interval should return false for the same originator
 | 
			
		||||
        long withinSameIntervalTs = baseTs + 1000L;
 | 
			
		||||
        assertThat(strategy.shouldPersist(withinSameIntervalTs, originator)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldProcess(withinSameIntervalTs, originator)).isFalse();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
@ -183,12 +183,12 @@ class DeduplicatePersistenceStrategyTest {
 | 
			
		||||
        UUID originator2 = UUID.randomUUID();
 | 
			
		||||
 | 
			
		||||
        // First call for different originators in the same interval should return true independently
 | 
			
		||||
        assertThat(strategy.shouldPersist(baseTs, originator1)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldPersist(baseTs, originator2)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldProcess(baseTs, originator1)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldProcess(baseTs, originator2)).isTrue();
 | 
			
		||||
 | 
			
		||||
        // Subsequent calls for the same originators within the same interval should return false
 | 
			
		||||
        assertThat(strategy.shouldPersist(baseTs + 500L, originator1)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldPersist(baseTs + 500L, originator2)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldProcess(baseTs + 500L, originator1)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldProcess(baseTs + 500L, originator2)).isFalse();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
@ -197,11 +197,11 @@ class DeduplicatePersistenceStrategyTest {
 | 
			
		||||
        long maxTs = Long.MAX_VALUE;
 | 
			
		||||
        UUID originator = UUID.randomUUID();
 | 
			
		||||
 | 
			
		||||
        assertThat(strategy.shouldPersist(minTs, originator)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldPersist(minTs + 1L, originator)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldProcess(minTs, originator)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldProcess(minTs + 1L, originator)).isFalse();
 | 
			
		||||
 | 
			
		||||
        assertThat(strategy.shouldPersist(maxTs, originator)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldPersist(maxTs - 1L, originator)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldProcess(maxTs, originator)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldProcess(maxTs - 1L, originator)).isFalse();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
@ -213,22 +213,22 @@ class DeduplicatePersistenceStrategyTest {
 | 
			
		||||
        long firstIntervalEnd = firstIntervalStart + Duration.ofSeconds(deduplicationIntervalSecs).toMillis() - 1L;
 | 
			
		||||
        long firstIntervalMiddle = calculateMiddle(firstIntervalStart, firstIntervalEnd);
 | 
			
		||||
 | 
			
		||||
        assertThat(strategy.shouldPersist(firstIntervalStart, originator)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldPersist(firstIntervalStart + 1, originator)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldPersist(firstIntervalMiddle, originator)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldPersist(firstIntervalEnd - 1, originator)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldPersist(firstIntervalEnd, originator)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldProcess(firstIntervalStart, originator)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldProcess(firstIntervalStart + 1, originator)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldProcess(firstIntervalMiddle, originator)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldProcess(firstIntervalEnd - 1, originator)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldProcess(firstIntervalEnd, originator)).isFalse();
 | 
			
		||||
 | 
			
		||||
        // check 2nd interval
 | 
			
		||||
        long secondIntervalStart = firstIntervalEnd + 1L;
 | 
			
		||||
        long secondIntervalEnd = secondIntervalStart + Duration.ofSeconds(deduplicationIntervalSecs).toMillis() - 1L;
 | 
			
		||||
        long secondIntervalMiddle = calculateMiddle(secondIntervalStart, secondIntervalEnd);
 | 
			
		||||
 | 
			
		||||
        assertThat(strategy.shouldPersist(secondIntervalStart, originator)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldPersist(secondIntervalStart + 1, originator)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldPersist(secondIntervalMiddle, originator)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldPersist(secondIntervalEnd - 1, originator)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldPersist(secondIntervalEnd, originator)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldProcess(secondIntervalStart, originator)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldProcess(secondIntervalStart + 1, originator)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldProcess(secondIntervalMiddle, originator)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldProcess(secondIntervalEnd - 1, originator)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldProcess(secondIntervalEnd, originator)).isFalse();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
@ -238,19 +238,19 @@ class DeduplicatePersistenceStrategyTest {
 | 
			
		||||
        long baseTs = 0L;
 | 
			
		||||
 | 
			
		||||
        // First interval for both originators
 | 
			
		||||
        assertThat(strategy.shouldPersist(baseTs, originator1)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldPersist(baseTs, originator2)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldProcess(baseTs, originator1)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldProcess(baseTs, originator2)).isTrue();
 | 
			
		||||
 | 
			
		||||
        // Move to the next interval
 | 
			
		||||
        long nextIntervalTs = baseTs + Duration.ofSeconds(10).toMillis();
 | 
			
		||||
 | 
			
		||||
        // Each originator should be allowed again in the new interval
 | 
			
		||||
        assertThat(strategy.shouldPersist(nextIntervalTs, originator1)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldPersist(nextIntervalTs, originator2)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldProcess(nextIntervalTs, originator1)).isTrue();
 | 
			
		||||
        assertThat(strategy.shouldProcess(nextIntervalTs, originator2)).isTrue();
 | 
			
		||||
 | 
			
		||||
        // Subsequent calls in the same new interval should return false
 | 
			
		||||
        assertThat(strategy.shouldPersist(nextIntervalTs + 500L, originator1)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldPersist(nextIntervalTs + 500L, originator2)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldProcess(nextIntervalTs + 500L, originator1)).isFalse();
 | 
			
		||||
        assertThat(strategy.shouldProcess(nextIntervalTs + 500L, originator2)).isFalse();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static long calculateMiddle(long start, long end) {
 | 
			
		||||
@ -24,13 +24,13 @@ import java.util.stream.Stream;
 | 
			
		||||
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
 | 
			
		||||
class OnEveryMessagePersistenceStrategyTest {
 | 
			
		||||
class OnEveryMessageProcessingStrategyTest {
 | 
			
		||||
 | 
			
		||||
    @ParameterizedTest
 | 
			
		||||
    @MethodSource("edgeCaseProvider")
 | 
			
		||||
    void shouldAlwaysReturnTrueForAnyInput(long timestamp, UUID originator) {
 | 
			
		||||
        var onEveryMessage = OnEveryMessagePersistenceStrategy.getInstance();
 | 
			
		||||
        assertThat(onEveryMessage.shouldPersist(timestamp, originator)).isTrue();
 | 
			
		||||
        var onEveryMessage = OnEveryMessageProcessingStrategy.getInstance();
 | 
			
		||||
        assertThat(onEveryMessage.shouldProcess(timestamp, originator)).isTrue();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static Stream<Arguments> edgeCaseProvider() {
 | 
			
		||||
@ -22,23 +22,23 @@ import java.time.Duration;
 | 
			
		||||
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
 | 
			
		||||
class PersistenceStrategyTest {
 | 
			
		||||
class ProcessingStrategyTest {
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void testOnEveryMessageReturnsCorrectInstance() {
 | 
			
		||||
        PersistenceStrategy strategy = PersistenceStrategy.onEveryMessage();
 | 
			
		||||
        ProcessingStrategy strategy = ProcessingStrategy.onEveryMessage();
 | 
			
		||||
        assertThat(strategy)
 | 
			
		||||
                .isNotNull()
 | 
			
		||||
                .isInstanceOf(OnEveryMessagePersistenceStrategy.class);
 | 
			
		||||
                .isInstanceOf(OnEveryMessageProcessingStrategy.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void testDeduplicateReturnsCorrectInstance() {
 | 
			
		||||
        int validDeduplicationIntervalSecs = 5;
 | 
			
		||||
        PersistenceStrategy strategy = PersistenceStrategy.deduplicate(validDeduplicationIntervalSecs);
 | 
			
		||||
        ProcessingStrategy strategy = ProcessingStrategy.deduplicate(validDeduplicationIntervalSecs);
 | 
			
		||||
        assertThat(strategy)
 | 
			
		||||
                .isNotNull()
 | 
			
		||||
                .isInstanceOf(DeduplicatePersistenceStrategy.class);
 | 
			
		||||
                .isInstanceOf(DeduplicateProcessingStrategy.class);
 | 
			
		||||
 | 
			
		||||
        long actualDeduplicationIntervalMillis = (long) ReflectionTestUtils.getField(strategy, "deduplicationIntervalMillis");
 | 
			
		||||
        assertThat(actualDeduplicationIntervalMillis).isEqualTo(Duration.ofSeconds(validDeduplicationIntervalSecs).toMillis());
 | 
			
		||||
@ -46,10 +46,10 @@ class PersistenceStrategyTest {
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void testSkipReturnsCorrectInstance() {
 | 
			
		||||
        PersistenceStrategy strategy = PersistenceStrategy.skip();
 | 
			
		||||
        ProcessingStrategy strategy = ProcessingStrategy.skip();
 | 
			
		||||
        assertThat(strategy)
 | 
			
		||||
                .isNotNull()
 | 
			
		||||
                .isInstanceOf(SkipPersistenceStrategy.class);
 | 
			
		||||
                .isInstanceOf(SkipProcessingStrategy.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -24,13 +24,13 @@ import java.util.stream.Stream;
 | 
			
		||||
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
 | 
			
		||||
class SkipPersistenceStrategyTest {
 | 
			
		||||
class SkipProcessingStrategyTest {
 | 
			
		||||
 | 
			
		||||
    @ParameterizedTest
 | 
			
		||||
    @MethodSource("edgeCaseProvider")
 | 
			
		||||
    void shouldAlwaysReturnFalseForAnyInput(long timestamp, UUID originator) {
 | 
			
		||||
        var skipStrategy = SkipPersistenceStrategy.getInstance();
 | 
			
		||||
        assertThat(skipStrategy.shouldPersist(timestamp, originator)).isFalse();
 | 
			
		||||
        var skipStrategy = SkipProcessingStrategy.getInstance();
 | 
			
		||||
        assertThat(skipStrategy.shouldProcess(timestamp, originator)).isFalse();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static Stream<Arguments> edgeCaseProvider() {
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user