fixed TIMESERIES_UPDATED message handling

This commit is contained in:
dashevchenko 2025-03-31 16:07:42 +03:00
parent 61254a6850
commit 832af90bd9
3 changed files with 72 additions and 2 deletions

View File

@ -590,7 +590,8 @@ public class JsonConverter {
public static Map<Long, List<KvEntry>> convertToSortedTelemetry(JsonElement jsonElement, long systemTs) throws public static Map<Long, List<KvEntry>> convertToSortedTelemetry(JsonElement jsonElement, long systemTs) throws
JsonSyntaxException { JsonSyntaxException {
return convertToTelemetry(jsonElement, systemTs, true); JsonElement timeseriesElement = jsonElement.isJsonObject() ? jsonElement.getAsJsonObject().get("timeseries") : null;
return convertToTelemetry(timeseriesElement != null ? timeseriesElement : jsonElement, systemTs, true);
} }
public static Map<Long, List<KvEntry>> convertToTelemetry(JsonElement jsonElement, long systemTs, boolean sorted) throws public static Map<Long, List<KvEntry>> convertToTelemetry(JsonElement jsonElement, long systemTs, boolean sorted) throws

View File

@ -66,6 +66,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_UNASSIGNED
import static org.thingsboard.server.common.data.msg.TbMsgType.INACTIVITY_EVENT; import static org.thingsboard.server.common.data.msg.TbMsgType.INACTIVITY_EVENT;
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST; import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST;
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST; import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST;
import static org.thingsboard.server.common.data.msg.TbMsgType.TIMESERIES_UPDATED;
@Slf4j @Slf4j
class DeviceState { class DeviceState {
@ -148,7 +149,7 @@ class DeviceState {
latestValues = fetchLatestValues(ctx, deviceId); latestValues = fetchLatestValues(ctx, deviceId);
} }
boolean stateChanged = false; boolean stateChanged = false;
if (msg.isTypeOf(POST_TELEMETRY_REQUEST)) { if (msg.isTypeOf(POST_TELEMETRY_REQUEST) || msg.isTypeOf(TIMESERIES_UPDATED)) {
stateChanged = processTelemetry(ctx, msg); stateChanged = processTelemetry(ctx, msg);
} else if (msg.isTypeOf(POST_ATTRIBUTES_REQUEST)) { } else if (msg.isTypeOf(POST_ATTRIBUTES_REQUEST)) {
stateChanged = processAttributesUpdateRequest(ctx, msg); stateChanged = processAttributesUpdateRequest(ctx, msg);

View File

@ -1981,6 +1981,74 @@ public class TbDeviceProfileNodeTest extends AbstractRuleNodeUpgradeTest {
} }
@Test
public void testAlarmCreateAfterTimeseriesUpdated() throws Exception {
init();
DeviceProfile deviceProfile = new DeviceProfile();
DeviceProfileData deviceProfileData = new DeviceProfileData();
AlarmConditionFilter highTempFilter = new AlarmConditionFilter();
highTempFilter.setKey(new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, "temperature"));
highTempFilter.setValueType(EntityKeyValueType.NUMERIC);
NumericFilterPredicate highTemperaturePredicate = new NumericFilterPredicate();
highTemperaturePredicate.setOperation(NumericFilterPredicate.NumericOperation.GREATER);
highTemperaturePredicate.setValue(new FilterPredicateValue<>(30.0));
highTempFilter.setPredicate(highTemperaturePredicate);
AlarmCondition alarmCondition = new AlarmCondition();
alarmCondition.setCondition(Collections.singletonList(highTempFilter));
AlarmRule alarmRule = new AlarmRule();
alarmRule.setCondition(alarmCondition);
DeviceProfileAlarm dpa = new DeviceProfileAlarm();
dpa.setId("highTemperatureAlarmID");
dpa.setAlarmType("highTemperatureAlarm");
dpa.setCreateRules(new TreeMap<>(Collections.singletonMap(AlarmSeverity.CRITICAL, alarmRule)));
AlarmConditionFilter lowTempFilter = new AlarmConditionFilter();
lowTempFilter.setKey(new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, "temperature"));
lowTempFilter.setValueType(EntityKeyValueType.NUMERIC);
NumericFilterPredicate lowTemperaturePredicate = new NumericFilterPredicate();
lowTemperaturePredicate.setOperation(NumericFilterPredicate.NumericOperation.LESS);
lowTemperaturePredicate.setValue(new FilterPredicateValue<>(10.0));
lowTempFilter.setPredicate(lowTemperaturePredicate);
AlarmRule clearRule = new AlarmRule();
AlarmCondition clearCondition = new AlarmCondition();
clearCondition.setCondition(Collections.singletonList(lowTempFilter));
clearRule.setCondition(clearCondition);
dpa.setClearRule(clearRule);
deviceProfileData.setAlarms(Collections.singletonList(dpa));
deviceProfile.setProfileData(deviceProfileData);
Mockito.when(cache.get(tenantId, deviceId)).thenReturn(deviceProfile);
Mockito.when(timeseriesService.findLatest(tenantId, deviceId, Collections.singleton("temperature")))
.thenReturn(Futures.immediateFuture(Collections.emptyList()));
Mockito.when(alarmService.findLatestActiveByOriginatorAndType(tenantId, deviceId, "highTemperatureAlarm")).thenReturn(null);
registerCreateAlarmMock(alarmService.createAlarm(any()), true);
TbMsg theMsg = TbMsg.newMsg()
.type(TbMsgType.ALARM)
.originator(deviceId)
.copyMetaData(TbMsgMetaData.EMPTY)
.data(TbMsg.EMPTY_STRING)
.build();
when(ctx.newMsg(any(), any(TbMsgType.class), any(), any(), any(), Mockito.anyString())).thenReturn(theMsg);
ObjectNode data = JacksonUtil.newObjectNode();
data.put("temperature", 42);
TbMsg msg = TbMsg.newMsg()
.type(TbMsgType.TIMESERIES_UPDATED)
.originator(deviceId)
.copyMetaData(TbMsgMetaData.EMPTY)
.dataType(TbMsgDataType.JSON)
.data(JacksonUtil.toString(data))
.build();
node.onMsg(ctx, msg);
verify(ctx).tellSuccess(msg);
verify(ctx).enqueueForTellNext(theMsg, "Alarm Created");
verify(ctx, Mockito.never()).tellFailure(Mockito.any(), Mockito.any());
}
@Override @Override
protected TbNode getTestNode() { protected TbNode getTestNode() {
return node; return node;