replaced new TbMsgMetaData() with TbMsgMetaData.EMPTY and added additional refactoring after review of changes
This commit is contained in:
		
							parent
							
								
									47929ef784
								
							
						
					
					
						commit
						4528348143
					
				@ -71,19 +71,13 @@ public enum TbMsgType {
 | 
			
		||||
 | 
			
		||||
    // tellSelfOnly types
 | 
			
		||||
    GENERATOR_NODE_SELF_MSG(null, true),
 | 
			
		||||
 | 
			
		||||
    DEVICE_PROFILE_PERIODIC_SELF_MSG(null, true),
 | 
			
		||||
    DEVICE_PROFILE_UPDATE_SELF_MSG(null, true),
 | 
			
		||||
    DEVICE_UPDATE_SELF_MSG(null, true),
 | 
			
		||||
 | 
			
		||||
    DEDUPLICATION_TIMEOUT_SELF_MSG(null, true),
 | 
			
		||||
 | 
			
		||||
    DELAY_TIMEOUT_SELF_MSG(null, true),
 | 
			
		||||
 | 
			
		||||
    MSG_COUNT_SELF_MSG(null, true);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    public static final List<String> NODE_CONNECTIONS = EnumSet.allOf(TbMsgType.class).stream()
 | 
			
		||||
            .filter(tbMsgType -> !tbMsgType.isTellSelfOnly())
 | 
			
		||||
            .map(TbMsgType::getRuleNodeConnection)
 | 
			
		||||
 | 
			
		||||
@ -54,7 +54,6 @@ class TbMsgTypeTest {
 | 
			
		||||
            MSG_COUNT_SELF_MSG
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    // backward-compatibility tests
 | 
			
		||||
    
 | 
			
		||||
    @Test
 | 
			
		||||
 | 
			
		||||
@ -136,7 +136,7 @@ public class TbMsgGeneratorNode implements TbNode {
 | 
			
		||||
        }
 | 
			
		||||
        lastScheduledTs = lastScheduledTs + delay;
 | 
			
		||||
        long curDelay = Math.max(0L, (lastScheduledTs - curTs));
 | 
			
		||||
        TbMsg tickMsg = ctx.newMsg(config.getQueueName(), TbMsgType.GENERATOR_NODE_SELF_MSG, ctx.getSelfId(), new TbMsgMetaData(), TbMsg.EMPTY_STRING);
 | 
			
		||||
        TbMsg tickMsg = ctx.newMsg(config.getQueueName(), TbMsgType.GENERATOR_NODE_SELF_MSG, ctx.getSelfId(), TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING);
 | 
			
		||||
        nextTickId = tickMsg.getId();
 | 
			
		||||
        ctx.tellSelf(tickMsg, curDelay);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
@ -98,8 +99,8 @@ public class TbGetTelemetryNode implements TbNode {
 | 
			
		||||
                List<String> keys = TbNodeUtils.processPatterns(tsKeyNames, msg);
 | 
			
		||||
                ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(interval, keys));
 | 
			
		||||
                DonAsynchron.withCallback(list, data -> {
 | 
			
		||||
                    process(data, msg, keys);
 | 
			
		||||
                    ctx.tellSuccess(msg);
 | 
			
		||||
                    var metaData = updateMetadata(data, msg, keys);
 | 
			
		||||
                    ctx.tellSuccess(TbMsg.transformMsg(msg, metaData));
 | 
			
		||||
                }, error -> ctx.tellFailure(msg, error), ctx.getDbCallbackExecutor());
 | 
			
		||||
            } catch (Exception e) {
 | 
			
		||||
                ctx.tellFailure(msg, e);
 | 
			
		||||
@ -129,19 +130,20 @@ public class TbGetTelemetryNode implements TbNode {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void process(List<TsKvEntry> entries, TbMsg msg, List<String> keys) {
 | 
			
		||||
    private TbMsgMetaData updateMetadata(List<TsKvEntry> entries, TbMsg msg, List<String> keys) {
 | 
			
		||||
        ObjectNode resultNode = JacksonUtil.newObjectNode(JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER);
 | 
			
		||||
        if (TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL.equals(fetchMode)) {
 | 
			
		||||
            entries.forEach(entry -> processArray(resultNode, entry));
 | 
			
		||||
        } else {
 | 
			
		||||
            entries.forEach(entry -> processSingle(resultNode, entry));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        var copy = msg.getMetaData().copy();
 | 
			
		||||
        for (String key : keys) {
 | 
			
		||||
            if (resultNode.has(key)) {
 | 
			
		||||
                msg.getMetaData().putValue(key, resultNode.get(key).toString());
 | 
			
		||||
                copy.putValue(key, resultNode.get(key).toString());
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return copy;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void processSingle(ObjectNode node, TsKvEntry entry) {
 | 
			
		||||
 | 
			
		||||
@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.KvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.msg.TbMsgType;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityKey;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityKeyType;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleNodeState;
 | 
			
		||||
@ -54,18 +55,6 @@ import java.util.concurrent.ConcurrentMap;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.ACTIVITY_EVENT;
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.ALARM_ACK;
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.ALARM_CLEAR;
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.ALARM_DELETE;
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_DELETED;
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_UPDATED;
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_ASSIGNED;
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_UNASSIGNED;
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.INACTIVITY_EVENT;
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST;
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
class DeviceState {
 | 
			
		||||
 | 
			
		||||
@ -147,24 +136,24 @@ class DeviceState {
 | 
			
		||||
            latestValues = fetchLatestValues(ctx, deviceId);
 | 
			
		||||
        }
 | 
			
		||||
        boolean stateChanged = false;
 | 
			
		||||
        if (msg.getType().equals(POST_TELEMETRY_REQUEST.name())) {
 | 
			
		||||
        if (msg.getType().equals(TbMsgType.POST_TELEMETRY_REQUEST.name())) {
 | 
			
		||||
            stateChanged = processTelemetry(ctx, msg);
 | 
			
		||||
        } else if (msg.getType().equals(POST_ATTRIBUTES_REQUEST.name())) {
 | 
			
		||||
        } else if (msg.getType().equals(TbMsgType.POST_ATTRIBUTES_REQUEST.name())) {
 | 
			
		||||
            stateChanged = processAttributesUpdateRequest(ctx, msg);
 | 
			
		||||
        } else if (msg.getType().equals(ACTIVITY_EVENT.name()) || msg.getType().equals(INACTIVITY_EVENT.name())) {
 | 
			
		||||
        } else if (msg.getType().equals(TbMsgType.ACTIVITY_EVENT.name()) || msg.getType().equals(TbMsgType.INACTIVITY_EVENT.name())) {
 | 
			
		||||
            stateChanged = processDeviceActivityEvent(ctx, msg);
 | 
			
		||||
        } else if (msg.getType().equals(ATTRIBUTES_UPDATED.name())) {
 | 
			
		||||
        } else if (msg.getType().equals(TbMsgType.ATTRIBUTES_UPDATED.name())) {
 | 
			
		||||
            stateChanged = processAttributesUpdateNotification(ctx, msg);
 | 
			
		||||
        } else if (msg.getType().equals(ATTRIBUTES_DELETED.name())) {
 | 
			
		||||
        } else if (msg.getType().equals(TbMsgType.ATTRIBUTES_DELETED.name())) {
 | 
			
		||||
            stateChanged = processAttributesDeleteNotification(ctx, msg);
 | 
			
		||||
        } else if (msg.getType().equals(ALARM_CLEAR.name())) {
 | 
			
		||||
        } else if (msg.getType().equals(TbMsgType.ALARM_CLEAR.name())) {
 | 
			
		||||
            stateChanged = processAlarmClearNotification(ctx, msg);
 | 
			
		||||
        } else if (msg.getType().equals(ALARM_ACK.name())) {
 | 
			
		||||
        } else if (msg.getType().equals(TbMsgType.ALARM_ACK.name())) {
 | 
			
		||||
            processAlarmAckNotification(ctx, msg);
 | 
			
		||||
        } else if (msg.getType().equals(ALARM_DELETE.name())) {
 | 
			
		||||
        } else if (msg.getType().equals(TbMsgType.ALARM_DELETE.name())) {
 | 
			
		||||
            processAlarmDeleteNotification(ctx, msg);
 | 
			
		||||
        } else {
 | 
			
		||||
            if (msg.getType().equals(ENTITY_ASSIGNED.name()) || msg.getType().equals(ENTITY_UNASSIGNED.name())) {
 | 
			
		||||
            if (msg.getType().equals(TbMsgType.ENTITY_ASSIGNED.name()) || msg.getType().equals(TbMsgType.ENTITY_UNASSIGNED.name())) {
 | 
			
		||||
                dynamicPredicateValueCtx.resetCustomer();
 | 
			
		||||
            }
 | 
			
		||||
            ctx.tellSuccess(msg);
 | 
			
		||||
 | 
			
		||||
@ -170,7 +170,7 @@ public class TbDeviceProfileNode implements TbNode {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected void scheduleAlarmHarvesting(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        TbMsg periodicCheck = TbMsg.newMsg(TbMsgType.DEVICE_PROFILE_PERIODIC_SELF_MSG, ctx.getTenantId(), msg != null ? msg.getCustomerId() : null, TbMsgMetaData.EMPTY, "{}");
 | 
			
		||||
        TbMsg periodicCheck = TbMsg.newMsg(TbMsgType.DEVICE_PROFILE_PERIODIC_SELF_MSG, ctx.getTenantId(), msg != null ? msg.getCustomerId() : null, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
 | 
			
		||||
        ctx.tellSelf(periodicCheck, TimeUnit.MINUTES.toMillis(1));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -122,7 +122,7 @@ public class TbCreateRelationNodeTest {
 | 
			
		||||
        TbMsgMetaData metaData = new TbMsgMetaData();
 | 
			
		||||
        metaData.putValue("name", "AssetName");
 | 
			
		||||
        metaData.putValue("type", "AssetType");
 | 
			
		||||
        msg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, deviceId, metaData, TbMsgDataType.JSON, "{}", ruleChainId, ruleNodeId);
 | 
			
		||||
        msg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, deviceId, metaData, TbMsgDataType.JSON, TbMsg.EMPTY_JSON_OBJECT, ruleChainId, ruleNodeId);
 | 
			
		||||
 | 
			
		||||
        EntityRelation relation = new EntityRelation();
 | 
			
		||||
        when(ctx.getRelationService().findByToAndTypeAsync(any(), eq(msg.getOriginator()), eq(EntityRelation.CONTAINS_TYPE), eq(RelationTypeGroup.COMMON)))
 | 
			
		||||
 | 
			
		||||
@ -125,7 +125,7 @@ public class TbMsgPushToEdgeNodeTest {
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testMiscEventsProcessedAsTimeseriesUpdated() {
 | 
			
		||||
        for (var event : MISC_EVENTS) {
 | 
			
		||||
            testEvent(event, new TbMsgMetaData(), EdgeEventActionType.TIMESERIES_UPDATED, "data");
 | 
			
		||||
            testEvent(event, TbMsgMetaData.EMPTY, EdgeEventActionType.TIMESERIES_UPDATED, "data");
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -59,7 +59,7 @@ public class TbJsFilterNodeTest {
 | 
			
		||||
    @Test
 | 
			
		||||
    public void falseEvaluationDoNotSendMsg() throws TbNodeException {
 | 
			
		||||
        initWithScript();
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, null, new TbMsgMetaData(), TbMsgDataType.JSON, TbMsg.EMPTY_JSON_OBJECT, ruleChainId, ruleNodeId);
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, null, TbMsgMetaData.EMPTY, TbMsgDataType.JSON, TbMsg.EMPTY_JSON_OBJECT, ruleChainId, ruleNodeId);
 | 
			
		||||
        when(scriptEngine.executeFilterAsync(msg)).thenReturn(Futures.immediateFuture(false));
 | 
			
		||||
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
 | 
			
		||||
@ -452,7 +452,7 @@ class TbGpsGeofencingFilterNodeTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TbMsg getEmptyArrayTbMsg(EntityId entityId) {
 | 
			
		||||
        return TbMsg.newMsg(TbMsgType.POST_ATTRIBUTES_REQUEST, entityId, TbMsgMetaData.EMPTY, "[]");
 | 
			
		||||
        return TbMsg.newMsg(TbMsgType.POST_ATTRIBUTES_REQUEST, entityId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -247,7 +247,7 @@ public class TbMathNodeTest {
 | 
			
		||||
                new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b")
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", arg1).put("b", arg2).toString());
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", arg1).put("b", arg2).toString());
 | 
			
		||||
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
 | 
			
		||||
@ -270,7 +270,7 @@ public class TbMathNodeTest {
 | 
			
		||||
                new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a")
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", arg1).toString());
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", arg1).toString());
 | 
			
		||||
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
 | 
			
		||||
@ -293,7 +293,7 @@ public class TbMathNodeTest {
 | 
			
		||||
                new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b")
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString());
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString());
 | 
			
		||||
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
 | 
			
		||||
@ -316,7 +316,7 @@ public class TbMathNodeTest {
 | 
			
		||||
                new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b")
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString());
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString());
 | 
			
		||||
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
 | 
			
		||||
@ -340,7 +340,7 @@ public class TbMathNodeTest {
 | 
			
		||||
                new TbMathArgument(TbMathArgumentType.TIME_SERIES, "b")
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().toString());
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().toString());
 | 
			
		||||
 | 
			
		||||
        Mockito.when(attributesService.find(tenantId, originator, DataConstants.SERVER_SCOPE, "a"))
 | 
			
		||||
                .thenReturn(Futures.immediateFuture(Optional.of(new BaseAttributeKvEntry(System.currentTimeMillis(), new DoubleDataEntry("a", 2.0)))));
 | 
			
		||||
@ -368,7 +368,7 @@ public class TbMathNodeTest {
 | 
			
		||||
                new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a")
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 5).toString());
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 5).toString());
 | 
			
		||||
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
 | 
			
		||||
@ -390,7 +390,7 @@ public class TbMathNodeTest {
 | 
			
		||||
                new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a")
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 5).toString());
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 5).toString());
 | 
			
		||||
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
 | 
			
		||||
@ -412,7 +412,7 @@ public class TbMathNodeTest {
 | 
			
		||||
                new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a")
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 5).toString());
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 5).toString());
 | 
			
		||||
 | 
			
		||||
        Mockito.when(telemetryService.saveAttrAndNotify(any(), any(), anyString(), anyString(), anyDouble()))
 | 
			
		||||
                .thenReturn(Futures.immediateFuture(null));
 | 
			
		||||
@ -438,7 +438,7 @@ public class TbMathNodeTest {
 | 
			
		||||
                new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a")
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 5).toString());
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 5).toString());
 | 
			
		||||
        Mockito.when(telemetryService.saveAndNotify(any(), any(), any(TsKvEntry.class)))
 | 
			
		||||
                .thenReturn(Futures.immediateFuture(null));
 | 
			
		||||
 | 
			
		||||
@ -463,7 +463,7 @@ public class TbMathNodeTest {
 | 
			
		||||
                new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a")
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 5).toString());
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 5).toString());
 | 
			
		||||
        Mockito.when(telemetryService.saveAndNotify(any(), any(), any(TsKvEntry.class)))
 | 
			
		||||
                .thenReturn(Futures.immediateFuture(null));
 | 
			
		||||
 | 
			
		||||
@ -494,7 +494,7 @@ public class TbMathNodeTest {
 | 
			
		||||
                new TbMathResult(TbMathArgumentType.MESSAGE_METADATA, "result", 3, false, false, null),
 | 
			
		||||
                tbMathArgument
 | 
			
		||||
        );
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 10).toString());
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 10).toString());
 | 
			
		||||
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
        ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
 | 
			
		||||
@ -514,7 +514,7 @@ public class TbMathNodeTest {
 | 
			
		||||
                new TbMathResult(TbMathArgumentType.TIME_SERIES, "result", 3, true, false, DataConstants.SERVER_SCOPE),
 | 
			
		||||
                new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "TestKey")
 | 
			
		||||
        );
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 10).toString());
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 10).toString());
 | 
			
		||||
        Throwable thrown = assertThrows(RuntimeException.class, () -> {
 | 
			
		||||
            node.onMsg(ctx, msg);
 | 
			
		||||
        });
 | 
			
		||||
@ -528,7 +528,7 @@ public class TbMathNodeTest {
 | 
			
		||||
                new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a")
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, new TbMsgMetaData(), "[]");
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY);
 | 
			
		||||
        Throwable thrown = assertThrows(RuntimeException.class, () -> {
 | 
			
		||||
            node.onMsg(ctx, msg);
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
@ -117,8 +117,7 @@ public class CalculateDeltaNodeTest {
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenInvalidMsgDataType_whenOnMsg_thenShouldTellNextOther() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        var msgData = "[]";
 | 
			
		||||
        var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, msgData);
 | 
			
		||||
        var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY);
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        node.onMsg(ctxMock, msg);
 | 
			
		||||
 | 
			
		||||
@ -241,7 +241,7 @@ public class TbGetAttributesNodeTest {
 | 
			
		||||
    public void givenFetchLatestTimeseriesToDataAndDataIsNotJsonObject_whenOnMsg_thenException() throws Exception {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        node = initNode(FetchTo.DATA, true, true);
 | 
			
		||||
        var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, ORIGINATOR, TbMsgMetaData.EMPTY, "[]");
 | 
			
		||||
        var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, ORIGINATOR, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY);
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        var exception = assertThrows(IllegalArgumentException.class, () -> node.onMsg(ctxMock, msg));
 | 
			
		||||
 | 
			
		||||
@ -208,7 +208,7 @@ public class TbGetCustomerAttributeNodeTest {
 | 
			
		||||
    public void givenMsgDataIsNotAnJsonObjectAndFetchToData_whenOnMsg_thenException() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        node.fetchTo = FetchTo.DATA;
 | 
			
		||||
        msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, "[]");
 | 
			
		||||
        msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY);
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        var exception = assertThrows(IllegalArgumentException.class, () -> node.onMsg(ctxMock, msg));
 | 
			
		||||
 | 
			
		||||
@ -157,7 +157,7 @@ public class TbGetCustomerDetailsNodeTest {
 | 
			
		||||
    public void givenMsgDataIsNotAnJsonObjectAndFetchToData_whenOnMsg_thenException() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        node.fetchTo = FetchTo.DATA;
 | 
			
		||||
        msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, "[]");
 | 
			
		||||
        msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY);
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        var exception = assertThrows(IllegalArgumentException.class, () -> node.onMsg(ctxMock, msg));
 | 
			
		||||
 | 
			
		||||
@ -133,7 +133,7 @@ public class TbGetOriginatorFieldsNodeTest {
 | 
			
		||||
    public void givenMsgDataIsNotAnJsonObjectAndFetchToData_whenOnMsg_thenException() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        node.fetchTo = FetchTo.DATA;
 | 
			
		||||
        msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, "[]");
 | 
			
		||||
        msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY);
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        var exception = assertThrows(IllegalArgumentException.class, () -> node.onMsg(ctxMock, msg));
 | 
			
		||||
 | 
			
		||||
@ -222,7 +222,7 @@ public class TbGetRelatedAttributeNodeTest {
 | 
			
		||||
    public void givenMsgDataIsNotAnJsonObjectAndFetchToData_whenOnMsg_thenException() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        node.fetchTo = FetchTo.DATA;
 | 
			
		||||
        msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, "[]");
 | 
			
		||||
        msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY);
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        var exception = assertThrows(IllegalArgumentException.class, () -> node.onMsg(ctxMock, msg));
 | 
			
		||||
 | 
			
		||||
@ -188,7 +188,7 @@ public class TbGetTenantAttributeNodeTest {
 | 
			
		||||
    public void givenMsgDataIsNotAnJsonObjectAndFetchToData_whenOnMsg_thenException() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        node.fetchTo = FetchTo.DATA;
 | 
			
		||||
        msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, "[]");
 | 
			
		||||
        msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY);
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        var exception = assertThrows(IllegalArgumentException.class, () -> node.onMsg(ctxMock, msg));
 | 
			
		||||
 | 
			
		||||
@ -127,7 +127,7 @@ public class TbGetTenantDetailsNodeTest {
 | 
			
		||||
    public void givenMsgDataIsNotAnJsonObjectAndFetchToData_whenOnMsg_thenException() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        node.fetchTo = FetchTo.DATA;
 | 
			
		||||
        msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, "[]");
 | 
			
		||||
        msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DUMMY_DEVICE_ORIGINATOR, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY);
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        var exception = assertThrows(IllegalArgumentException.class, () -> node.onMsg(ctxMock, msg));
 | 
			
		||||
 | 
			
		||||
@ -107,7 +107,7 @@ public class DeviceStateTest {
 | 
			
		||||
        DeviceState deviceState = createDeviceState(deviceId, alarmConfig);
 | 
			
		||||
 | 
			
		||||
        TbMsg attributeUpdateMsg = TbMsg.newMsg(TbMsgType.POST_ATTRIBUTES_REQUEST,
 | 
			
		||||
                deviceId, new TbMsgMetaData(), "{ \"enabled\": false }");
 | 
			
		||||
                deviceId, TbMsgMetaData.EMPTY, "{ \"enabled\": false }");
 | 
			
		||||
 | 
			
		||||
        deviceState.process(ctx, attributeUpdateMsg);
 | 
			
		||||
 | 
			
		||||
@ -119,7 +119,7 @@ public class DeviceStateTest {
 | 
			
		||||
        reset(ctx);
 | 
			
		||||
 | 
			
		||||
        String deletedAttributes = "{ \"attributes\": [ \"other\" ] }";
 | 
			
		||||
        deviceState.process(ctx, TbMsg.newMsg(TbMsgType.ATTRIBUTES_DELETED, deviceId, new TbMsgMetaData(), deletedAttributes));
 | 
			
		||||
        deviceState.process(ctx, TbMsg.newMsg(TbMsgType.ATTRIBUTES_DELETED, deviceId, TbMsgMetaData.EMPTY, deletedAttributes));
 | 
			
		||||
        verify(ctx, never()).enqueueForTellNext(any(), anyString());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -145,7 +145,7 @@ public class TbHttpClientTest {
 | 
			
		||||
        var httpClient = new TbHttpClient(config, eventLoop);
 | 
			
		||||
        httpClient.setHttpClient(asyncRestTemplate);
 | 
			
		||||
 | 
			
		||||
        var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, new DeviceId(EntityId.NULL_UUID), TbMsgMetaData.EMPTY, "{}");
 | 
			
		||||
        var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, new DeviceId(EntityId.NULL_UUID), TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
 | 
			
		||||
        var successMsg = TbMsg.newMsg(
 | 
			
		||||
                TbMsgType.POST_TELEMETRY_REQUEST, msg.getOriginator(),
 | 
			
		||||
                msg.getMetaData(), msg.getData()
 | 
			
		||||
 | 
			
		||||
@ -79,8 +79,7 @@ public class TbCopyKeysNodeTest {
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenMsgFromMetadata_whenOnMsg_thenVerifyOutput() throws Exception {
 | 
			
		||||
        String data = "{}";
 | 
			
		||||
        node.onMsg(ctx, getTbMsg(deviceId, data));
 | 
			
		||||
        node.onMsg(ctx, getTbMsg(deviceId, TbMsg.EMPTY_JSON_OBJECT));
 | 
			
		||||
 | 
			
		||||
        ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
 | 
			
		||||
        verify(ctx, times(1)).tellSuccess(newMsgCaptor.capture());
 | 
			
		||||
@ -137,8 +136,7 @@ public class TbCopyKeysNodeTest {
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenMsgDataNotJSONObject_whenOnMsg_thenTVerifyOutput() throws Exception {
 | 
			
		||||
        String data = "[]";
 | 
			
		||||
        TbMsg msg = getTbMsg(deviceId, data);
 | 
			
		||||
        TbMsg msg = getTbMsg(deviceId, TbMsg.EMPTY_JSON_ARRAY);
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
 | 
			
		||||
        ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
 | 
			
		||||
 | 
			
		||||
@ -79,8 +79,7 @@ public class TbDeleteKeysNodeTest {
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenMsgFromMetadata_whenOnMsg_thenVerifyOutput() throws Exception {
 | 
			
		||||
        String data = "{}";
 | 
			
		||||
        node.onMsg(ctx, getTbMsg(deviceId, data));
 | 
			
		||||
        node.onMsg(ctx, getTbMsg(deviceId, TbMsg.EMPTY_JSON_OBJECT));
 | 
			
		||||
 | 
			
		||||
        ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
 | 
			
		||||
        verify(ctx, times(1)).tellSuccess(newMsgCaptor.capture());
 | 
			
		||||
 | 
			
		||||
@ -135,8 +135,7 @@ public class TbRenameKeysNodeTest {
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenMsgDataNotJSONObject_whenOnMsg_thenVerifyOutput() throws Exception {
 | 
			
		||||
        String data = "[]";
 | 
			
		||||
        TbMsg msg = getTbMsg(deviceId, data);
 | 
			
		||||
        TbMsg msg = getTbMsg(deviceId, TbMsg.EMPTY_JSON_ARRAY);
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
 | 
			
		||||
        ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
 | 
			
		||||
 | 
			
		||||
@ -83,8 +83,7 @@ public class TbSplitArrayMsgNodeTest {
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenZeroMsg_whenOnMsg_thenVerifyOutput() throws Exception {
 | 
			
		||||
        String data = "[]";
 | 
			
		||||
        VerifyOutputMsg(data);
 | 
			
		||||
        VerifyOutputMsg(TbMsg.EMPTY_JSON_ARRAY);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user