Merge pull request #13072 from dashevchenko/deviceStateFix
Fix manually added telemetry not triggerring alarm
This commit is contained in:
		
						commit
						a4dd7406e1
					
				@ -15,6 +15,7 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.profile;
 | 
			
		||||
 | 
			
		||||
import com.google.gson.JsonElement;
 | 
			
		||||
import com.google.gson.JsonParser;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
@ -49,6 +50,7 @@ import java.util.HashMap;
 | 
			
		||||
import java.util.HashSet;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ConcurrentMap;
 | 
			
		||||
@ -66,6 +68,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_UNASSIGNED
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.INACTIVITY_EVENT;
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST;
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST;
 | 
			
		||||
import static org.thingsboard.server.common.data.msg.TbMsgType.TIMESERIES_UPDATED;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
class DeviceState {
 | 
			
		||||
@ -149,7 +152,9 @@ class DeviceState {
 | 
			
		||||
        }
 | 
			
		||||
        boolean stateChanged = false;
 | 
			
		||||
        if (msg.isTypeOf(POST_TELEMETRY_REQUEST)) {
 | 
			
		||||
            stateChanged = processTelemetry(ctx, msg);
 | 
			
		||||
            stateChanged = processTelemetryRequest(ctx, msg);
 | 
			
		||||
        } else if (msg.isTypeOf(TIMESERIES_UPDATED)) {
 | 
			
		||||
            stateChanged = processTelemetryUpdatedNotification(ctx, msg);
 | 
			
		||||
        } else if (msg.isTypeOf(POST_ATTRIBUTES_REQUEST)) {
 | 
			
		||||
            stateChanged = processAttributesUpdateRequest(ctx, msg);
 | 
			
		||||
        } else if (msg.isTypeOneOf(ACTIVITY_EVENT, INACTIVITY_EVENT)) {
 | 
			
		||||
@ -179,7 +184,7 @@ class DeviceState {
 | 
			
		||||
    private boolean processDeviceActivityEvent(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
 | 
			
		||||
        String scope = msg.getMetaData().getValue(DataConstants.SCOPE);
 | 
			
		||||
        if (StringUtils.isEmpty(scope)) {
 | 
			
		||||
            return processTelemetry(ctx, msg);
 | 
			
		||||
            return processTelemetryRequest(ctx, msg);
 | 
			
		||||
        } else {
 | 
			
		||||
            return processAttributes(ctx, msg, scope);
 | 
			
		||||
        }
 | 
			
		||||
@ -266,9 +271,22 @@ class DeviceState {
 | 
			
		||||
        return stateChanged;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected boolean processTelemetry(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
 | 
			
		||||
    protected boolean processTelemetryRequest(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
 | 
			
		||||
        return processTelemetryUpdate(ctx, msg, JsonParser.parseString(msg.getData()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected boolean processTelemetryUpdatedNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
 | 
			
		||||
        JsonElement msgData = JsonParser.parseString(msg.getData());
 | 
			
		||||
        JsonElement telemetryData = Optional.ofNullable(JsonParser.parseString(msg.getData()))
 | 
			
		||||
                .filter(JsonElement::isJsonObject)
 | 
			
		||||
                .map(e -> e.getAsJsonObject().get("timeseries"))
 | 
			
		||||
                .orElse(msgData);
 | 
			
		||||
        return processTelemetryUpdate(ctx, msg, telemetryData);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean processTelemetryUpdate(TbContext ctx, TbMsg msg, JsonElement telemetryData) throws ExecutionException, InterruptedException {
 | 
			
		||||
        boolean stateChanged = false;
 | 
			
		||||
        Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToSortedTelemetry(JsonParser.parseString(msg.getData()), msg.getMetaDataTs());
 | 
			
		||||
        Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToSortedTelemetry(telemetryData, msg.getMetaDataTs());
 | 
			
		||||
        // iterate over data by ts (ASC order).
 | 
			
		||||
        for (Map.Entry<Long, List<KvEntry>> entry : tsKvMap.entrySet()) {
 | 
			
		||||
            Long ts = entry.getKey();
 | 
			
		||||
 | 
			
		||||
@ -2061,6 +2061,74 @@ public class TbDeviceProfileNodeTest extends AbstractRuleNodeUpgradeTest {
 | 
			
		||||
        return filter;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testAlarmCreateAfterTimeseriesUpdated() throws Exception {
 | 
			
		||||
        init();
 | 
			
		||||
 | 
			
		||||
        DeviceProfile deviceProfile = new DeviceProfile();
 | 
			
		||||
        DeviceProfileData deviceProfileData = new DeviceProfileData();
 | 
			
		||||
 | 
			
		||||
        AlarmConditionFilter highTempFilter = new AlarmConditionFilter();
 | 
			
		||||
        highTempFilter.setKey(new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, "temperature"));
 | 
			
		||||
        highTempFilter.setValueType(EntityKeyValueType.NUMERIC);
 | 
			
		||||
        NumericFilterPredicate highTemperaturePredicate = new NumericFilterPredicate();
 | 
			
		||||
        highTemperaturePredicate.setOperation(NumericFilterPredicate.NumericOperation.GREATER);
 | 
			
		||||
        highTemperaturePredicate.setValue(new FilterPredicateValue<>(30.0));
 | 
			
		||||
        highTempFilter.setPredicate(highTemperaturePredicate);
 | 
			
		||||
        AlarmCondition alarmCondition = new AlarmCondition();
 | 
			
		||||
        alarmCondition.setCondition(Collections.singletonList(highTempFilter));
 | 
			
		||||
        AlarmRule alarmRule = new AlarmRule();
 | 
			
		||||
        alarmRule.setCondition(alarmCondition);
 | 
			
		||||
        DeviceProfileAlarm dpa = new DeviceProfileAlarm();
 | 
			
		||||
        dpa.setId("highTemperatureAlarmID");
 | 
			
		||||
        dpa.setAlarmType("highTemperatureAlarm");
 | 
			
		||||
        dpa.setCreateRules(new TreeMap<>(Collections.singletonMap(AlarmSeverity.CRITICAL, alarmRule)));
 | 
			
		||||
 | 
			
		||||
        AlarmConditionFilter lowTempFilter = new AlarmConditionFilter();
 | 
			
		||||
        lowTempFilter.setKey(new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, "temperature"));
 | 
			
		||||
        lowTempFilter.setValueType(EntityKeyValueType.NUMERIC);
 | 
			
		||||
        NumericFilterPredicate lowTemperaturePredicate = new NumericFilterPredicate();
 | 
			
		||||
        lowTemperaturePredicate.setOperation(NumericFilterPredicate.NumericOperation.LESS);
 | 
			
		||||
        lowTemperaturePredicate.setValue(new FilterPredicateValue<>(10.0));
 | 
			
		||||
        lowTempFilter.setPredicate(lowTemperaturePredicate);
 | 
			
		||||
        AlarmRule clearRule = new AlarmRule();
 | 
			
		||||
        AlarmCondition clearCondition = new AlarmCondition();
 | 
			
		||||
        clearCondition.setCondition(Collections.singletonList(lowTempFilter));
 | 
			
		||||
        clearRule.setCondition(clearCondition);
 | 
			
		||||
        dpa.setClearRule(clearRule);
 | 
			
		||||
 | 
			
		||||
        deviceProfileData.setAlarms(Collections.singletonList(dpa));
 | 
			
		||||
        deviceProfile.setProfileData(deviceProfileData);
 | 
			
		||||
 | 
			
		||||
        Mockito.when(cache.get(tenantId, deviceId)).thenReturn(deviceProfile);
 | 
			
		||||
        Mockito.when(timeseriesService.findLatest(tenantId, deviceId, Collections.singleton("temperature")))
 | 
			
		||||
                .thenReturn(Futures.immediateFuture(Collections.emptyList()));
 | 
			
		||||
        Mockito.when(alarmService.findLatestActiveByOriginatorAndType(tenantId, deviceId, "highTemperatureAlarm")).thenReturn(null);
 | 
			
		||||
        registerCreateAlarmMock(alarmService.createAlarm(any()), true);
 | 
			
		||||
 | 
			
		||||
        TbMsg theMsg = TbMsg.newMsg()
 | 
			
		||||
                .type(TbMsgType.ALARM)
 | 
			
		||||
                .originator(deviceId)
 | 
			
		||||
                .copyMetaData(TbMsgMetaData.EMPTY)
 | 
			
		||||
                .data(TbMsg.EMPTY_STRING)
 | 
			
		||||
                .build();
 | 
			
		||||
        when(ctx.newMsg(any(), any(TbMsgType.class), any(), any(), any(), Mockito.anyString())).thenReturn(theMsg);
 | 
			
		||||
 | 
			
		||||
        ObjectNode data = JacksonUtil.newObjectNode();
 | 
			
		||||
        data.put("temperature", 42);
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg()
 | 
			
		||||
                .type(TbMsgType.TIMESERIES_UPDATED)
 | 
			
		||||
                .originator(deviceId)
 | 
			
		||||
                .copyMetaData(TbMsgMetaData.EMPTY)
 | 
			
		||||
                .dataType(TbMsgDataType.JSON)
 | 
			
		||||
                .data(JacksonUtil.toString(data))
 | 
			
		||||
                .build();
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
        verify(ctx).tellSuccess(msg);
 | 
			
		||||
        verify(ctx).enqueueForTellNext(theMsg, "Alarm Created");
 | 
			
		||||
        verify(ctx, Mockito.never()).tellFailure(Mockito.any(), Mockito.any());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected TbNode getTestNode() {
 | 
			
		||||
        return node;
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user