added util method
This commit is contained in:
		
							parent
							
								
									42ba543fb0
								
							
						
					
					
						commit
						9b870126f1
					
				@ -153,7 +153,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void process(EntityCalculatedFieldTelemetryMsg msg) throws CalculatedFieldException {
 | 
			
		||||
        log.info("[{}] Processing CF telemetry msg.", msg.getEntityId());
 | 
			
		||||
        log.debug("[{}] Processing CF telemetry msg.", msg.getEntityId());
 | 
			
		||||
        var proto = msg.getProto();
 | 
			
		||||
        var numberOfCallbacks = CALLBACKS_PER_CF * (msg.getEntityIdFields().size() + msg.getProfileIdFields().size());
 | 
			
		||||
        MultipleTbCallback callback = new MultipleTbCallback(numberOfCallbacks, msg.getCallback());
 | 
			
		||||
@ -168,7 +168,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void process(EntityCalculatedFieldLinkedTelemetryMsg msg) throws CalculatedFieldException {
 | 
			
		||||
        log.info("[{}] Processing CF link telemetry msg.", msg.getEntityId());
 | 
			
		||||
        log.debug("[{}] Processing CF link telemetry msg.", msg.getEntityId());
 | 
			
		||||
        var proto = msg.getProto();
 | 
			
		||||
        var ctx = msg.getCtx();
 | 
			
		||||
        var callback = new MultipleTbCallback(CALLBACKS_PER_CF, msg.getCallback());
 | 
			
		||||
 | 
			
		||||
@ -318,14 +318,14 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
 | 
			
		||||
 | 
			
		||||
    public void onTelemetryMsg(CalculatedFieldTelemetryMsg msg) {
 | 
			
		||||
        EntityId entityId = msg.getEntityId();
 | 
			
		||||
        log.info("Received telemetry msg from entity [{}]", entityId);
 | 
			
		||||
        log.debug("Received telemetry msg from entity [{}]", entityId);
 | 
			
		||||
        // 2 = 1 for CF processing + 1 for links processing
 | 
			
		||||
        MultipleTbCallback callback = new MultipleTbCallback(2, msg.getCallback());
 | 
			
		||||
        // process all cfs related to entity, or it's profile;
 | 
			
		||||
        var entityIdFields = getCalculatedFieldsByEntityId(entityId);
 | 
			
		||||
        var profileIdFields = getCalculatedFieldsByEntityId(getProfileId(tenantId, entityId));
 | 
			
		||||
        if (!entityIdFields.isEmpty() || !profileIdFields.isEmpty()) {
 | 
			
		||||
            log.info("Pushing telemetry msg to specific actor [{}]", entityId);
 | 
			
		||||
            log.debug("Pushing telemetry msg to specific actor [{}]", entityId);
 | 
			
		||||
            getOrCreateActor(entityId).tell(new EntityCalculatedFieldTelemetryMsg(msg, entityIdFields, profileIdFields, callback));
 | 
			
		||||
        } else {
 | 
			
		||||
            callback.onSuccess();
 | 
			
		||||
@ -342,7 +342,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
 | 
			
		||||
 | 
			
		||||
    public void onLinkedTelemetryMsg(CalculatedFieldLinkedTelemetryMsg msg) {
 | 
			
		||||
        EntityId sourceEntityId = msg.getEntityId();
 | 
			
		||||
        log.info("Received linked telemetry msg from entity [{}]", sourceEntityId);
 | 
			
		||||
        log.debug("Received linked telemetry msg from entity [{}]", sourceEntityId);
 | 
			
		||||
        var proto = msg.getProto();
 | 
			
		||||
        var linksList = proto.getLinksList();
 | 
			
		||||
        for (var linkProto : linksList) {
 | 
			
		||||
@ -357,14 +357,14 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
 | 
			
		||||
                    MultipleTbCallback callback = new MultipleTbCallback(entityIds.size(), msg.getCallback());
 | 
			
		||||
                    var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, callback);
 | 
			
		||||
                    entityIds.forEach(entityId -> {
 | 
			
		||||
                        log.info("Pushing linked telemetry msg to specific actor [{}]", entityId);
 | 
			
		||||
                        log.debug("Pushing linked telemetry msg to specific actor [{}]", entityId);
 | 
			
		||||
                        getOrCreateActor(entityId).tell(newMsg);
 | 
			
		||||
                    });
 | 
			
		||||
                } else {
 | 
			
		||||
                    msg.getCallback().onSuccess();
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
                log.info("Pushing linked telemetry msg to specific actor [{}]", targetEntityId);
 | 
			
		||||
                log.debug("Pushing linked telemetry msg to specific actor [{}]", targetEntityId);
 | 
			
		||||
                var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, msg.getCallback());
 | 
			
		||||
                getOrCreateActor(targetEntityId).tell(newMsg);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
@ -34,7 +34,6 @@ import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.util.TbPair;
 | 
			
		||||
import org.thingsboard.server.common.util.ProtoUtils;
 | 
			
		||||
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
 | 
			
		||||
@ -44,7 +43,6 @@ import java.util.ArrayList;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
public class CalculatedFieldCtx {
 | 
			
		||||
@ -58,8 +56,6 @@ public class CalculatedFieldCtx {
 | 
			
		||||
    private final Map<String, Argument> arguments;
 | 
			
		||||
    private final Map<ReferencedEntityKey, String> mainEntityArguments;
 | 
			
		||||
    private final Map<EntityId, Map<ReferencedEntityKey, String>> linkedEntityArguments;
 | 
			
		||||
 | 
			
		||||
    private final Map<TbPair<EntityId, ReferencedEntityKey>, String> referencedEntityKeys;
 | 
			
		||||
    private final List<String> argNames;
 | 
			
		||||
    private Output output;
 | 
			
		||||
    private String expression;
 | 
			
		||||
@ -93,11 +89,6 @@ public class CalculatedFieldCtx {
 | 
			
		||||
                linkedEntityArguments.computeIfAbsent(refId, key -> new HashMap<>()).put(refKey, entry.getKey());
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        this.referencedEntityKeys = arguments.entrySet().stream()
 | 
			
		||||
                .collect(Collectors.toMap(
 | 
			
		||||
                        entry -> new TbPair<>(entry.getValue().getRefEntityId() == null ? entityId : entry.getValue().getRefEntityId(), entry.getValue().getRefEntityKey()),
 | 
			
		||||
                        Map.Entry::getKey
 | 
			
		||||
                ));
 | 
			
		||||
        this.argNames = new ArrayList<>(arguments.keySet());
 | 
			
		||||
        this.output = configuration.getOutput();
 | 
			
		||||
        this.expression = configuration.getExpression();
 | 
			
		||||
 | 
			
		||||
@ -18,8 +18,6 @@ package org.thingsboard.script.api.tbel;
 | 
			
		||||
import com.google.common.primitives.Bytes;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.apache.commons.lang3.ArrayUtils;
 | 
			
		||||
import org.mvel2.ConversionHandler;
 | 
			
		||||
import org.mvel2.DataConversion;
 | 
			
		||||
import org.mvel2.ExecutionContext;
 | 
			
		||||
import org.mvel2.ParserConfiguration;
 | 
			
		||||
import org.mvel2.execution.ExecutionArrayList;
 | 
			
		||||
@ -259,6 +257,8 @@ public class TbUtils {
 | 
			
		||||
                float.class, int.class)));
 | 
			
		||||
        parserConfig.addImport("toInt", new MethodStub(TbUtils.class.getMethod("toInt",
 | 
			
		||||
                double.class)));
 | 
			
		||||
        parserConfig.addImport("isNaN", new MethodStub(TbUtils.class.getMethod("isNaN",
 | 
			
		||||
                double.class)));
 | 
			
		||||
        parserConfig.addImport("hexToBytes", new MethodStub(TbUtils.class.getMethod("hexToBytes",
 | 
			
		||||
                ExecutionContext.class, String.class)));
 | 
			
		||||
        parserConfig.addImport("hexToBytesArray", new MethodStub(TbUtils.class.getMethod("hexToBytesArray",
 | 
			
		||||
@ -1163,6 +1163,10 @@ public class TbUtils {
 | 
			
		||||
        return BigDecimal.valueOf(value).setScale(0, RoundingMode.HALF_UP).intValue();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static boolean isNaN(double value) {
 | 
			
		||||
        return Double.isNaN(value);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static ExecutionHashMap<String, Object> toFlatMap(ExecutionContext ctx, Map<String, Object> json) {
 | 
			
		||||
        return toFlatMap(ctx, json, new ArrayList<>(), true);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -1138,12 +1138,18 @@ public class TbUtilsTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void toInt() {
 | 
			
		||||
    public void toInt() {
 | 
			
		||||
        Assertions.assertEquals(1729, TbUtils.toInt(doubleVal));
 | 
			
		||||
        Assertions.assertEquals(13, TbUtils.toInt(12.8));
 | 
			
		||||
        Assertions.assertEquals(28, TbUtils.toInt(28.0));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void isNaN() {
 | 
			
		||||
        Assertions.assertFalse(TbUtils.isNaN(doubleVal));
 | 
			
		||||
        Assertions.assertTrue(TbUtils.isNaN(Double.NaN));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static List<Byte> toList(byte[] data) {
 | 
			
		||||
        List<Byte> result = new ArrayList<>(data.length);
 | 
			
		||||
        for (Byte b : data) {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user