diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java index 6cc44e9e8b..193c179df2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.profile; +import com.google.gson.JsonElement; import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.JacksonUtil; @@ -49,6 +50,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -66,6 +68,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_UNASSIGNED import static org.thingsboard.server.common.data.msg.TbMsgType.INACTIVITY_EVENT; import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST; import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST; +import static org.thingsboard.server.common.data.msg.TbMsgType.TIMESERIES_UPDATED; @Slf4j class DeviceState { @@ -149,7 +152,9 @@ class DeviceState { } boolean stateChanged = false; if (msg.isTypeOf(POST_TELEMETRY_REQUEST)) { - stateChanged = processTelemetry(ctx, msg); + stateChanged = processTelemetryRequest(ctx, msg); + } else if (msg.isTypeOf(TIMESERIES_UPDATED)) { + stateChanged = processTelemetryUpdatedNotification(ctx, msg); } else if (msg.isTypeOf(POST_ATTRIBUTES_REQUEST)) { stateChanged = processAttributesUpdateRequest(ctx, msg); } else if (msg.isTypeOneOf(ACTIVITY_EVENT, INACTIVITY_EVENT)) { @@ -179,7 +184,7 @@ class DeviceState { private boolean processDeviceActivityEvent(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { String scope = msg.getMetaData().getValue(DataConstants.SCOPE); if (StringUtils.isEmpty(scope)) { - return processTelemetry(ctx, msg); + return processTelemetryRequest(ctx, msg); } else { return processAttributes(ctx, msg, scope); } @@ -266,9 +271,22 @@ class DeviceState { return stateChanged; } - protected boolean processTelemetry(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + protected boolean processTelemetryRequest(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + return processTelemetryUpdate(ctx, msg, JsonParser.parseString(msg.getData())); + } + + protected boolean processTelemetryUpdatedNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + JsonElement msgData = JsonParser.parseString(msg.getData()); + JsonElement telemetryData = Optional.ofNullable(JsonParser.parseString(msg.getData())) + .filter(JsonElement::isJsonObject) + .map(e -> e.getAsJsonObject().get("timeseries")) + .orElse(msgData); + return processTelemetryUpdate(ctx, msg, telemetryData); + } + + private boolean processTelemetryUpdate(TbContext ctx, TbMsg msg, JsonElement telemetryData) throws ExecutionException, InterruptedException { boolean stateChanged = false; - Map> tsKvMap = JsonConverter.convertToSortedTelemetry(JsonParser.parseString(msg.getData()), msg.getMetaDataTs()); + Map> tsKvMap = JsonConverter.convertToSortedTelemetry(telemetryData, msg.getMetaDataTs()); // iterate over data by ts (ASC order). for (Map.Entry> entry : tsKvMap.entrySet()) { Long ts = entry.getKey(); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java index 5163e21c05..90c35e59b1 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java @@ -2061,6 +2061,74 @@ public class TbDeviceProfileNodeTest extends AbstractRuleNodeUpgradeTest { return filter; } + @Test + public void testAlarmCreateAfterTimeseriesUpdated() throws Exception { + init(); + + DeviceProfile deviceProfile = new DeviceProfile(); + DeviceProfileData deviceProfileData = new DeviceProfileData(); + + AlarmConditionFilter highTempFilter = new AlarmConditionFilter(); + highTempFilter.setKey(new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, "temperature")); + highTempFilter.setValueType(EntityKeyValueType.NUMERIC); + NumericFilterPredicate highTemperaturePredicate = new NumericFilterPredicate(); + highTemperaturePredicate.setOperation(NumericFilterPredicate.NumericOperation.GREATER); + highTemperaturePredicate.setValue(new FilterPredicateValue<>(30.0)); + highTempFilter.setPredicate(highTemperaturePredicate); + AlarmCondition alarmCondition = new AlarmCondition(); + alarmCondition.setCondition(Collections.singletonList(highTempFilter)); + AlarmRule alarmRule = new AlarmRule(); + alarmRule.setCondition(alarmCondition); + DeviceProfileAlarm dpa = new DeviceProfileAlarm(); + dpa.setId("highTemperatureAlarmID"); + dpa.setAlarmType("highTemperatureAlarm"); + dpa.setCreateRules(new TreeMap<>(Collections.singletonMap(AlarmSeverity.CRITICAL, alarmRule))); + + AlarmConditionFilter lowTempFilter = new AlarmConditionFilter(); + lowTempFilter.setKey(new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, "temperature")); + lowTempFilter.setValueType(EntityKeyValueType.NUMERIC); + NumericFilterPredicate lowTemperaturePredicate = new NumericFilterPredicate(); + lowTemperaturePredicate.setOperation(NumericFilterPredicate.NumericOperation.LESS); + lowTemperaturePredicate.setValue(new FilterPredicateValue<>(10.0)); + lowTempFilter.setPredicate(lowTemperaturePredicate); + AlarmRule clearRule = new AlarmRule(); + AlarmCondition clearCondition = new AlarmCondition(); + clearCondition.setCondition(Collections.singletonList(lowTempFilter)); + clearRule.setCondition(clearCondition); + dpa.setClearRule(clearRule); + + deviceProfileData.setAlarms(Collections.singletonList(dpa)); + deviceProfile.setProfileData(deviceProfileData); + + Mockito.when(cache.get(tenantId, deviceId)).thenReturn(deviceProfile); + Mockito.when(timeseriesService.findLatest(tenantId, deviceId, Collections.singleton("temperature"))) + .thenReturn(Futures.immediateFuture(Collections.emptyList())); + Mockito.when(alarmService.findLatestActiveByOriginatorAndType(tenantId, deviceId, "highTemperatureAlarm")).thenReturn(null); + registerCreateAlarmMock(alarmService.createAlarm(any()), true); + + TbMsg theMsg = TbMsg.newMsg() + .type(TbMsgType.ALARM) + .originator(deviceId) + .copyMetaData(TbMsgMetaData.EMPTY) + .data(TbMsg.EMPTY_STRING) + .build(); + when(ctx.newMsg(any(), any(TbMsgType.class), any(), any(), any(), Mockito.anyString())).thenReturn(theMsg); + + ObjectNode data = JacksonUtil.newObjectNode(); + data.put("temperature", 42); + TbMsg msg = TbMsg.newMsg() + .type(TbMsgType.TIMESERIES_UPDATED) + .originator(deviceId) + .copyMetaData(TbMsgMetaData.EMPTY) + .dataType(TbMsgDataType.JSON) + .data(JacksonUtil.toString(data)) + .build(); + node.onMsg(ctx, msg); + verify(ctx).tellSuccess(msg); + verify(ctx).enqueueForTellNext(theMsg, "Alarm Created"); + verify(ctx, Mockito.never()).tellFailure(Mockito.any(), Mockito.any()); + } + @Override protected TbNode getTestNode() { return node;