added last records type of cf
This commit is contained in:
		
							parent
							
								
									2c7c6f0c5e
								
							
						
					
					
						commit
						c6d91c4ce8
					
				@ -48,12 +48,16 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.Aggregation;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.KvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.StringDataEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.msg.TbMsgType;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageDataIterable;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
@ -69,7 +73,10 @@ import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.queue.util.TbCoreComponent;
 | 
			
		||||
import org.thingsboard.server.service.cf.ctx.CalculatedFieldCtx;
 | 
			
		||||
import org.thingsboard.server.service.cf.ctx.CalculatedFieldCtxId;
 | 
			
		||||
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
 | 
			
		||||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
 | 
			
		||||
import org.thingsboard.server.service.cf.ctx.state.CalculationContext;
 | 
			
		||||
import org.thingsboard.server.service.cf.ctx.state.LastRecordsCalculatedFieldState;
 | 
			
		||||
import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState;
 | 
			
		||||
import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState;
 | 
			
		||||
import org.thingsboard.server.service.partition.AbstractPartitionBasedService;
 | 
			
		||||
@ -84,6 +91,7 @@ import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ConcurrentMap;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.server.common.data.DataConstants.SCOPE;
 | 
			
		||||
 | 
			
		||||
@ -109,6 +117,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
    private final ConcurrentMap<CalculatedFieldId, List<CalculatedFieldLink>> calculatedFieldLinks = new ConcurrentHashMap<>();
 | 
			
		||||
    private final ConcurrentMap<CalculatedFieldCtxId, CalculatedFieldCtx> states = new ConcurrentHashMap<>();
 | 
			
		||||
 | 
			
		||||
    private static final int MAX_LAST_RECORDS_VALUE = 1024;
 | 
			
		||||
 | 
			
		||||
    @Value("${calculatedField.initFetchPackSize:50000}")
 | 
			
		||||
    @Getter
 | 
			
		||||
    private int initFetchPackSize;
 | 
			
		||||
@ -215,7 +225,19 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
    public void onTelemetryUpdate(TenantId tenantId, CalculatedFieldId calculatedFieldId, Map<String, KvEntry> updatedTelemetry) {
 | 
			
		||||
        try {
 | 
			
		||||
            CalculatedField calculatedField = calculatedFields.computeIfAbsent(calculatedFieldId, id -> calculatedFieldService.findById(tenantId, id));
 | 
			
		||||
            updateOrInitializeState(calculatedField, calculatedField.getEntityId(), updatedTelemetry);
 | 
			
		||||
            Map<String, ArgumentEntry> argumentValues = updatedTelemetry.entrySet().stream()
 | 
			
		||||
                    .collect(Collectors.toMap(
 | 
			
		||||
                            Map.Entry::getKey,
 | 
			
		||||
                            entry -> {
 | 
			
		||||
                                ArgumentEntry argumentEntry = new ArgumentEntry();
 | 
			
		||||
                                argumentEntry.setKvEntry(entry.getValue());
 | 
			
		||||
                                if (entry.getValue() instanceof TsKvEntry) {
 | 
			
		||||
                                    argumentEntry.setKvEntries(List.of((TsKvEntry) entry.getValue()));
 | 
			
		||||
                                }
 | 
			
		||||
                                return argumentEntry;
 | 
			
		||||
                            }
 | 
			
		||||
                    ));
 | 
			
		||||
            updateOrInitializeState(calculatedField, calculatedField.getEntityId(), argumentValues);
 | 
			
		||||
            log.info("Successfully updated time series for calculatedFieldId: [{}]", calculatedFieldId);
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.trace("Failed to update telemetry for calculatedFieldId: [{}]", calculatedFieldId, e);
 | 
			
		||||
@ -308,12 +330,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
 | 
			
		||||
    private void initializeStateForEntity(TenantId tenantId, CalculatedField calculatedField, EntityId entityId, TbCallback callback) {
 | 
			
		||||
        Map<String, Argument> arguments = calculatedField.getConfiguration().getArguments();
 | 
			
		||||
        Map<String, KvEntry> argumentValues = new HashMap<>();
 | 
			
		||||
        Map<String, ArgumentEntry> argumentValues = new HashMap<>();
 | 
			
		||||
        AtomicInteger remaining = new AtomicInteger(arguments.size());
 | 
			
		||||
        arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(tenantId, argument, entityId), new FutureCallback<>() {
 | 
			
		||||
        arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(calculatedField, argument), new FutureCallback<>() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(Optional<? extends KvEntry> result) {
 | 
			
		||||
                argumentValues.put(key, result.orElse(null));
 | 
			
		||||
            public void onSuccess(ArgumentEntry result) {
 | 
			
		||||
                argumentValues.put(key, result);
 | 
			
		||||
                if (remaining.decrementAndGet() == 0) {
 | 
			
		||||
                    updateOrInitializeState(calculatedField, entityId, argumentValues);
 | 
			
		||||
                }
 | 
			
		||||
@ -327,10 +349,37 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
        }, calculatedFieldCallbackExecutor));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Optional<? extends KvEntry>> fetchArgumentValue(TenantId tenantId, Argument argument, EntityId targetEntityId) {
 | 
			
		||||
    private ListenableFuture<ArgumentEntry> fetchArgumentValue(CalculatedField calculatedField, Argument argument) {
 | 
			
		||||
        TenantId tenantId = calculatedField.getTenantId();
 | 
			
		||||
        EntityId cfEntityId = calculatedField.getEntityId();
 | 
			
		||||
        EntityId argumentEntityId = argument.getEntityId();
 | 
			
		||||
        EntityId entityId = EntityType.DEVICE_PROFILE.equals(argumentEntityId.getEntityType()) || EntityType.ASSET_PROFILE.equals(argumentEntityId.getEntityType()) ? targetEntityId : argumentEntityId;
 | 
			
		||||
        return switch (argument.getType()) {
 | 
			
		||||
        EntityId entityId = EntityType.DEVICE_PROFILE.equals(argumentEntityId.getEntityType()) || EntityType.ASSET_PROFILE.equals(argumentEntityId.getEntityType())
 | 
			
		||||
                ? cfEntityId
 | 
			
		||||
                : argumentEntityId;
 | 
			
		||||
        if (CalculatedFieldType.LAST_RECORDS.equals(calculatedField.getType())) {
 | 
			
		||||
            return fetchLastRecords(tenantId, entityId, argument);
 | 
			
		||||
        }
 | 
			
		||||
        return fetchKvEntry(tenantId, entityId, argument);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<ArgumentEntry> fetchLastRecords(TenantId tenantId, EntityId entityId, Argument argument) {
 | 
			
		||||
        long startTs = Math.max(argument.getStartTs(), 0);
 | 
			
		||||
        long timeWindow = argument.getTimeWindow() == 0 ? System.currentTimeMillis() : argument.getTimeWindow();
 | 
			
		||||
        long endTs = startTs + timeWindow;
 | 
			
		||||
        int limit = argument.getLimit() == 0 ? MAX_LAST_RECORDS_VALUE : argument.getLimit();
 | 
			
		||||
 | 
			
		||||
        ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getKey(), startTs, endTs, 0, limit, Aggregation.NONE);
 | 
			
		||||
        ListenableFuture<List<TsKvEntry>> lastRecordsFuture = timeseriesService.findAll(tenantId, entityId, List.of(query));
 | 
			
		||||
 | 
			
		||||
        return Futures.transform(lastRecordsFuture, lastRecords -> {
 | 
			
		||||
            ArgumentEntry argumentEntry = new ArgumentEntry();
 | 
			
		||||
            argumentEntry.setKvEntries(lastRecords);
 | 
			
		||||
            return argumentEntry;
 | 
			
		||||
        }, calculatedFieldExecutor);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<ArgumentEntry> fetchKvEntry(TenantId tenantId, EntityId entityId, Argument argument) {
 | 
			
		||||
        ListenableFuture<Optional<? extends KvEntry>> kvEntryFuture = switch (argument.getType()) {
 | 
			
		||||
            case "ATTRIBUTES" -> Futures.transform(
 | 
			
		||||
                    attributesService.find(tenantId, entityId, argument.getScope(), argument.getKey()),
 | 
			
		||||
                    result -> result.or(() -> Optional.of(
 | 
			
		||||
@ -342,9 +391,16 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
                    result -> result.or(() -> Optional.of(
 | 
			
		||||
                            new BasicTsKvEntry(System.currentTimeMillis(), createDefaultKvEntry(argument))
 | 
			
		||||
                    )),
 | 
			
		||||
                    MoreExecutors.directExecutor());
 | 
			
		||||
                    calculatedFieldExecutor);
 | 
			
		||||
            default -> throw new IllegalArgumentException("Invalid argument type '" + argument.getType() + "'.");
 | 
			
		||||
        };
 | 
			
		||||
        return Futures.transform(kvEntryFuture, kvEntry -> {
 | 
			
		||||
            ArgumentEntry argumentEntry = new ArgumentEntry();
 | 
			
		||||
            if (kvEntry.isPresent()) {
 | 
			
		||||
                argumentEntry.setKvEntry(kvEntry.orElse(null));
 | 
			
		||||
            }
 | 
			
		||||
            return argumentEntry;
 | 
			
		||||
        }, calculatedFieldExecutor);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private KvEntry createDefaultKvEntry(Argument argument) {
 | 
			
		||||
@ -359,7 +415,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
        return new StringDataEntry(key, defaultValue);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void updateOrInitializeState(CalculatedField calculatedField, EntityId entityId, Map<String, KvEntry> argumentValues) {
 | 
			
		||||
    private void updateOrInitializeState(CalculatedField calculatedField, EntityId entityId, Map<String, ArgumentEntry> argumentValues) {
 | 
			
		||||
        CalculatedFieldCtxId ctxId = new CalculatedFieldCtxId(calculatedField.getUuidId(), entityId.getId());
 | 
			
		||||
        CalculatedFieldCtx calculatedFieldCtx = states.computeIfAbsent(ctxId, ctx -> new CalculatedFieldCtx(ctxId, null));
 | 
			
		||||
 | 
			
		||||
@ -373,7 +429,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
        states.put(ctxId, calculatedFieldCtx);
 | 
			
		||||
        rocksDBService.put(JacksonUtil.writeValueAsString(ctxId), JacksonUtil.writeValueAsString(calculatedFieldCtx));
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<CalculatedFieldResult> resultFuture = state.performCalculation(calculatedField.getTenantId(), calculatedField.getConfiguration(), tbelInvokeService);
 | 
			
		||||
        CalculationContext ctx = CalculationContext.builder()
 | 
			
		||||
                .tenantId(calculatedField.getTenantId())
 | 
			
		||||
                .configuration(calculatedField.getConfiguration())
 | 
			
		||||
                .tbelInvokeService(tbelInvokeService)
 | 
			
		||||
                .build();
 | 
			
		||||
        ListenableFuture<CalculatedFieldResult> resultFuture = state.performCalculation(ctx);
 | 
			
		||||
        Futures.addCallback(resultFuture, new FutureCallback<>() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(CalculatedFieldResult result) {
 | 
			
		||||
@ -414,6 +475,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
        return switch (calculatedFieldType) {
 | 
			
		||||
            case SIMPLE -> new SimpleCalculatedFieldState();
 | 
			
		||||
            case SCRIPT -> new ScriptCalculatedFieldState();
 | 
			
		||||
            case LAST_RECORDS -> new LastRecordsCalculatedFieldState();
 | 
			
		||||
        };
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,30 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2024 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.cf.ctx.state;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.KvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
public class ArgumentEntry {
 | 
			
		||||
 | 
			
		||||
    private KvEntry kvEntry;
 | 
			
		||||
    private List<TsKvEntry> kvEntries;
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -19,11 +19,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 | 
			
		||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
 | 
			
		||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import org.thingsboard.script.api.tbel.TbelInvokeService;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.Argument;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.KvEntry;
 | 
			
		||||
import org.thingsboard.server.service.cf.CalculatedFieldResult;
 | 
			
		||||
 | 
			
		||||
@ -36,7 +33,8 @@ import java.util.Map;
 | 
			
		||||
)
 | 
			
		||||
@JsonSubTypes({
 | 
			
		||||
        @JsonSubTypes.Type(value = SimpleCalculatedFieldState.class, name = "SIMPLE"),
 | 
			
		||||
        @JsonSubTypes.Type(value = ScriptCalculatedFieldState.class, name = "SCRIPT")
 | 
			
		||||
        @JsonSubTypes.Type(value = ScriptCalculatedFieldState.class, name = "SCRIPT"),
 | 
			
		||||
        @JsonSubTypes.Type(value = LastRecordsCalculatedFieldState.class, name = "LAST_RECORDS")
 | 
			
		||||
})
 | 
			
		||||
public interface CalculatedFieldState {
 | 
			
		||||
 | 
			
		||||
@ -47,8 +45,8 @@ public interface CalculatedFieldState {
 | 
			
		||||
        return argumentValues.keySet().containsAll(arguments.keySet());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void initState(Map<String, KvEntry> argumentValues);
 | 
			
		||||
    void initState(Map<String, ArgumentEntry> argumentValues);
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<CalculatedFieldResult> performCalculation(TenantId tenantId, CalculatedFieldConfiguration calculatedFieldConfiguration, TbelInvokeService tbelInvokeService);
 | 
			
		||||
    ListenableFuture<CalculatedFieldResult> performCalculation(CalculationContext ctx);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,35 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2024 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.cf.ctx.state;
 | 
			
		||||
 | 
			
		||||
import lombok.Builder;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.script.api.tbel.TbelInvokeService;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.KvEntry;
 | 
			
		||||
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
@Builder
 | 
			
		||||
public class CalculationContext {
 | 
			
		||||
 | 
			
		||||
    private TenantId tenantId;
 | 
			
		||||
    private CalculatedFieldConfiguration configuration;
 | 
			
		||||
    private TbelInvokeService tbelInvokeService;
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,91 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2024 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.cf.ctx.state;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.Argument;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.Output;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
import org.thingsboard.server.service.cf.CalculatedFieldResult;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Comparator;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
public class LastRecordsCalculatedFieldState implements CalculatedFieldState {
 | 
			
		||||
 | 
			
		||||
    private Map<String, List<TsKvEntry>> arguments;
 | 
			
		||||
 | 
			
		||||
    public LastRecordsCalculatedFieldState() {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public CalculatedFieldType getType() {
 | 
			
		||||
        return CalculatedFieldType.LAST_RECORDS;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void initState(Map<String, ArgumentEntry> argumentValues) {
 | 
			
		||||
        if (arguments == null) {
 | 
			
		||||
            arguments = new HashMap<>();
 | 
			
		||||
        }
 | 
			
		||||
        argumentValues.forEach((key, argumentEntry) -> {
 | 
			
		||||
            List<TsKvEntry> tsKvEntryList = arguments.computeIfAbsent(key, k -> new ArrayList<>());
 | 
			
		||||
            tsKvEntryList.addAll(argumentEntry.getKvEntries());
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<CalculatedFieldResult> performCalculation(CalculationContext ctx) {
 | 
			
		||||
        CalculatedFieldConfiguration configuration = ctx.getConfiguration();
 | 
			
		||||
        Map<String, Argument> configArguments = configuration.getArguments();
 | 
			
		||||
        Output output = configuration.getOutput();
 | 
			
		||||
 | 
			
		||||
        Map<String, Object> resultMap = new HashMap<>();
 | 
			
		||||
 | 
			
		||||
        arguments.replaceAll((key, entries) -> {
 | 
			
		||||
            int limit = configArguments.get(key).getLimit();
 | 
			
		||||
            List<TsKvEntry> limitedEntries = entries.stream()
 | 
			
		||||
                    .sorted(Comparator.comparingLong(TsKvEntry::getTs).reversed())
 | 
			
		||||
                    .limit(limit)
 | 
			
		||||
                    .collect(Collectors.toList());
 | 
			
		||||
 | 
			
		||||
            Map<Long, Object> valueWithTs = limitedEntries.stream()
 | 
			
		||||
                    .collect(Collectors.toMap(TsKvEntry::getTs, TsKvEntry::getValue));
 | 
			
		||||
            resultMap.put(key, valueWithTs);
 | 
			
		||||
 | 
			
		||||
            return limitedEntries;
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        CalculatedFieldResult calculatedFieldResult = new CalculatedFieldResult();
 | 
			
		||||
        calculatedFieldResult.setType(output.getType());
 | 
			
		||||
        calculatedFieldResult.setScope(output.getScope());
 | 
			
		||||
        calculatedFieldResult.setResultMap(resultMap);
 | 
			
		||||
 | 
			
		||||
        return Futures.immediateFuture(calculatedFieldResult);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -40,7 +40,7 @@ public class ScriptCalculatedFieldState implements CalculatedFieldState {
 | 
			
		||||
    @JsonIgnore
 | 
			
		||||
    private CalculatedFieldScriptEngine calculatedFieldScriptEngine;
 | 
			
		||||
 | 
			
		||||
    private Map<String, KvEntry> arguments = new HashMap<>();
 | 
			
		||||
    private Map<String, KvEntry> arguments;
 | 
			
		||||
 | 
			
		||||
    public ScriptCalculatedFieldState() {
 | 
			
		||||
    }
 | 
			
		||||
@ -50,22 +50,27 @@ public class ScriptCalculatedFieldState implements CalculatedFieldState {
 | 
			
		||||
        return CalculatedFieldType.SCRIPT;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void initState(Map<String, KvEntry> argumentValues) {
 | 
			
		||||
        if (arguments == null) {
 | 
			
		||||
            this.arguments = new HashMap<>();
 | 
			
		||||
        }
 | 
			
		||||
        this.arguments.putAll(argumentValues);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<CalculatedFieldResult> performCalculation(TenantId tenantId, CalculatedFieldConfiguration calculatedFieldConfiguration, TbelInvokeService tbelInvokeService) {
 | 
			
		||||
    public void initState(Map<String, ArgumentEntry> argumentValues) {
 | 
			
		||||
        if (arguments == null) {
 | 
			
		||||
            arguments = new HashMap<>();
 | 
			
		||||
        }
 | 
			
		||||
        argumentValues.forEach((key, value) -> arguments.put(key, value.getKvEntry()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<CalculatedFieldResult> performCalculation(CalculationContext ctx) {
 | 
			
		||||
        CalculatedFieldConfiguration calculatedFieldConfiguration = ctx.getConfiguration();
 | 
			
		||||
        TbelInvokeService tbelInvokeService = ctx.getTbelInvokeService();
 | 
			
		||||
 | 
			
		||||
        if (tbelInvokeService == null) {
 | 
			
		||||
            throw new IllegalArgumentException("TBEL script engine is disabled!");
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (calculatedFieldScriptEngine == null) {
 | 
			
		||||
            initEngine(tenantId, calculatedFieldConfiguration, tbelInvokeService);
 | 
			
		||||
            initEngine(ctx.getTenantId(), calculatedFieldConfiguration, tbelInvokeService);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<Object> resultFuture = calculatedFieldScriptEngine.executeScriptAsync(arguments);
 | 
			
		||||
 | 
			
		||||
@ -20,12 +20,10 @@ import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import net.objecthunter.exp4j.Expression;
 | 
			
		||||
import net.objecthunter.exp4j.ExpressionBuilder;
 | 
			
		||||
import org.thingsboard.script.api.tbel.TbelInvokeService;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.Argument;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.Output;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.KvEntry;
 | 
			
		||||
import org.thingsboard.server.service.cf.CalculatedFieldResult;
 | 
			
		||||
 | 
			
		||||
@ -37,21 +35,26 @@ public class SimpleCalculatedFieldState implements CalculatedFieldState {
 | 
			
		||||
 | 
			
		||||
    private Map<String, KvEntry> arguments;
 | 
			
		||||
 | 
			
		||||
    public SimpleCalculatedFieldState() {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public CalculatedFieldType getType() {
 | 
			
		||||
        return CalculatedFieldType.SIMPLE;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void initState(Map<String, KvEntry> argumentValues) {
 | 
			
		||||
    public void initState(Map<String, ArgumentEntry> argumentValues) {
 | 
			
		||||
        if (arguments == null) {
 | 
			
		||||
            arguments = new HashMap<>();
 | 
			
		||||
        }
 | 
			
		||||
        arguments.putAll(argumentValues);
 | 
			
		||||
        argumentValues.forEach((key, value) -> arguments.put(key, value.getKvEntry()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<CalculatedFieldResult> performCalculation(TenantId tenantId, CalculatedFieldConfiguration calculatedFieldConfiguration, TbelInvokeService tbelInvokeService) {
 | 
			
		||||
    public ListenableFuture<CalculatedFieldResult> performCalculation(CalculationContext ctx) {
 | 
			
		||||
        CalculatedFieldConfiguration calculatedFieldConfiguration = ctx.getConfiguration();
 | 
			
		||||
 | 
			
		||||
        Output output = calculatedFieldConfiguration.getOutput();
 | 
			
		||||
        Map<String, Argument> arguments = calculatedFieldConfiguration.getArguments();
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -17,6 +17,6 @@ package org.thingsboard.server.common.data.cf;
 | 
			
		||||
 | 
			
		||||
public enum CalculatedFieldType {
 | 
			
		||||
 | 
			
		||||
    SIMPLE, SCRIPT
 | 
			
		||||
    SIMPLE, SCRIPT, LAST_RECORDS
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -29,6 +29,7 @@ public class Argument {
 | 
			
		||||
    private String defaultValue;
 | 
			
		||||
 | 
			
		||||
    private int limit;
 | 
			
		||||
    private long startTs;
 | 
			
		||||
    private long timeWindow;
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -101,6 +101,9 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel
 | 
			
		||||
            argumentNode.put("type", argument.getType());
 | 
			
		||||
            argumentNode.put("scope", String.valueOf(argument.getScope()));
 | 
			
		||||
            argumentNode.put("defaultValue", argument.getDefaultValue());
 | 
			
		||||
            argumentNode.put("limit", String.valueOf(argument.getLimit()));
 | 
			
		||||
            argumentNode.put("startTs", String.valueOf(argument.getStartTs()));
 | 
			
		||||
            argumentNode.put("timeWindow", String.valueOf(argument.getTimeWindow()));
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        if (expression != null) {
 | 
			
		||||
@ -144,7 +147,18 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel
 | 
			
		||||
                if (scope != null && !scope.isNull() && !scope.asText().equals("null")) {
 | 
			
		||||
                    argument.setScope(AttributeScope.valueOf(scope.asText()));
 | 
			
		||||
                }
 | 
			
		||||
                argument.setDefaultValue(argumentNode.get("defaultValue").asText());
 | 
			
		||||
                if (argumentNode.hasNonNull("defaultValue")) {
 | 
			
		||||
                    argument.setDefaultValue(argumentNode.get("defaultValue").asText());
 | 
			
		||||
                }
 | 
			
		||||
                if (argumentNode.hasNonNull("limit")) {
 | 
			
		||||
                    argument.setLimit(argumentNode.get("limit").asInt());
 | 
			
		||||
                }
 | 
			
		||||
                if (argumentNode.hasNonNull("startTs")) {
 | 
			
		||||
                    argument.setStartTs(argumentNode.get("startTs").asLong());
 | 
			
		||||
                }
 | 
			
		||||
                if (argumentNode.hasNonNull("timeWindow")) {
 | 
			
		||||
                    argument.setTimeWindow(argumentNode.get("timeWindow").asInt());
 | 
			
		||||
                }
 | 
			
		||||
                arguments.put(key, argument);
 | 
			
		||||
            });
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -35,7 +35,8 @@ import java.util.UUID;
 | 
			
		||||
)
 | 
			
		||||
@JsonSubTypes({
 | 
			
		||||
        @JsonSubTypes.Type(value = SimpleCalculatedFieldConfiguration.class, name = "SIMPLE"),
 | 
			
		||||
        @JsonSubTypes.Type(value = ScriptCalculatedFieldConfiguration.class, name = "SCRIPT")
 | 
			
		||||
        @JsonSubTypes.Type(value = ScriptCalculatedFieldConfiguration.class, name = "SCRIPT"),
 | 
			
		||||
        @JsonSubTypes.Type(value = LastRecordsCalculatedFieldConfiguration.class, name = "LAST_RECORDS")
 | 
			
		||||
})
 | 
			
		||||
public interface CalculatedFieldConfiguration {
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,39 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2024 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.data.cf.configuration;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
 | 
			
		||||
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
public class LastRecordsCalculatedFieldConfiguration extends BaseCalculatedFieldConfiguration implements CalculatedFieldConfiguration {
 | 
			
		||||
 | 
			
		||||
    public LastRecordsCalculatedFieldConfiguration() {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public LastRecordsCalculatedFieldConfiguration(JsonNode config, EntityType entityType, UUID entityId) {
 | 
			
		||||
        super(config, entityType, entityId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public CalculatedFieldType getType() {
 | 
			
		||||
        return CalculatedFieldType.LAST_RECORDS;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -24,8 +24,9 @@ import lombok.Data;
 | 
			
		||||
import lombok.EqualsAndHashCode;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.CalculatedField;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.LastRecordsCalculatedFieldConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.ScriptCalculatedFieldConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
 | 
			
		||||
@ -120,14 +121,11 @@ public class CalculatedFieldEntity extends BaseSqlEntity<CalculatedField> implem
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private CalculatedFieldConfiguration readCalculatedFieldConfiguration(JsonNode config, EntityType entityType, UUID entityId) {
 | 
			
		||||
        switch (CalculatedFieldType.valueOf(type)) {
 | 
			
		||||
            case SIMPLE:
 | 
			
		||||
                return new SimpleCalculatedFieldConfiguration(config, entityType, entityId);
 | 
			
		||||
            case SCRIPT:
 | 
			
		||||
                return new ScriptCalculatedFieldConfiguration(config, entityType, entityId);
 | 
			
		||||
            default:
 | 
			
		||||
                throw new IllegalArgumentException("Unsupported calculated field type: " + type + "!");
 | 
			
		||||
        }
 | 
			
		||||
        return switch (CalculatedFieldType.valueOf(type)) {
 | 
			
		||||
            case SIMPLE -> new SimpleCalculatedFieldConfiguration(config, entityType, entityId);
 | 
			
		||||
            case SCRIPT -> new ScriptCalculatedFieldConfiguration(config, entityType, entityId);
 | 
			
		||||
            case LAST_RECORDS -> new LastRecordsCalculatedFieldConfiguration(config, entityType, entityId);
 | 
			
		||||
        };
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.CalculatedFieldLinkConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.LastRecordsCalculatedFieldConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.ScriptCalculatedFieldConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
 | 
			
		||||
@ -136,14 +137,11 @@ public class DefaultNativeCalculatedFieldRepository implements NativeCalculatedF
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private CalculatedFieldConfiguration readCalculatedFieldConfiguration(CalculatedFieldType type, JsonNode config, EntityType entityType, UUID entityId) {
 | 
			
		||||
        switch (type) {
 | 
			
		||||
            case SIMPLE:
 | 
			
		||||
                return new SimpleCalculatedFieldConfiguration(config, entityType, entityId);
 | 
			
		||||
            case SCRIPT:
 | 
			
		||||
                return new ScriptCalculatedFieldConfiguration(config, entityType, entityId);
 | 
			
		||||
            default:
 | 
			
		||||
                throw new IllegalArgumentException("Unsupported calculated field type: " + type + "!");
 | 
			
		||||
        }
 | 
			
		||||
        return switch (type) {
 | 
			
		||||
            case SIMPLE -> new SimpleCalculatedFieldConfiguration(config, entityType, entityId);
 | 
			
		||||
            case SCRIPT -> new ScriptCalculatedFieldConfiguration(config, entityType, entityId);
 | 
			
		||||
            case LAST_RECORDS -> new LastRecordsCalculatedFieldConfiguration(config, entityType, entityId);
 | 
			
		||||
        };
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user