From 832af90bd904c9c1fa9b04a2ffb8db168bc838cf Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Mon, 31 Mar 2025 16:07:42 +0300 Subject: [PATCH 1/3] fixed TIMESERIES_UPDATED message handling --- .../server/common/adaptor/JsonConverter.java | 3 +- .../rule/engine/profile/DeviceState.java | 3 +- .../profile/TbDeviceProfileNodeTest.java | 68 +++++++++++++++++++ 3 files changed, 72 insertions(+), 2 deletions(-) diff --git a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java index 2a208923d9..f1575f8a1c 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java @@ -590,7 +590,8 @@ public class JsonConverter { public static Map> convertToSortedTelemetry(JsonElement jsonElement, long systemTs) throws JsonSyntaxException { - return convertToTelemetry(jsonElement, systemTs, true); + JsonElement timeseriesElement = jsonElement.isJsonObject() ? jsonElement.getAsJsonObject().get("timeseries") : null; + return convertToTelemetry(timeseriesElement != null ? timeseriesElement : jsonElement, systemTs, true); } public static Map> convertToTelemetry(JsonElement jsonElement, long systemTs, boolean sorted) throws diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java index 6cc44e9e8b..bbce5e5dfe 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java @@ -66,6 +66,7 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_UNASSIGNED import static org.thingsboard.server.common.data.msg.TbMsgType.INACTIVITY_EVENT; import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST; import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST; +import static org.thingsboard.server.common.data.msg.TbMsgType.TIMESERIES_UPDATED; @Slf4j class DeviceState { @@ -148,7 +149,7 @@ class DeviceState { latestValues = fetchLatestValues(ctx, deviceId); } boolean stateChanged = false; - if (msg.isTypeOf(POST_TELEMETRY_REQUEST)) { + if (msg.isTypeOf(POST_TELEMETRY_REQUEST) || msg.isTypeOf(TIMESERIES_UPDATED)) { stateChanged = processTelemetry(ctx, msg); } else if (msg.isTypeOf(POST_ATTRIBUTES_REQUEST)) { stateChanged = processAttributesUpdateRequest(ctx, msg); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java index 7ecb1b2ad8..9c87f506b3 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java @@ -1981,6 +1981,74 @@ public class TbDeviceProfileNodeTest extends AbstractRuleNodeUpgradeTest { } + @Test + public void testAlarmCreateAfterTimeseriesUpdated() throws Exception { + init(); + + DeviceProfile deviceProfile = new DeviceProfile(); + DeviceProfileData deviceProfileData = new DeviceProfileData(); + + AlarmConditionFilter highTempFilter = new AlarmConditionFilter(); + highTempFilter.setKey(new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, "temperature")); + highTempFilter.setValueType(EntityKeyValueType.NUMERIC); + NumericFilterPredicate highTemperaturePredicate = new NumericFilterPredicate(); + highTemperaturePredicate.setOperation(NumericFilterPredicate.NumericOperation.GREATER); + highTemperaturePredicate.setValue(new FilterPredicateValue<>(30.0)); + highTempFilter.setPredicate(highTemperaturePredicate); + AlarmCondition alarmCondition = new AlarmCondition(); + alarmCondition.setCondition(Collections.singletonList(highTempFilter)); + AlarmRule alarmRule = new AlarmRule(); + alarmRule.setCondition(alarmCondition); + DeviceProfileAlarm dpa = new DeviceProfileAlarm(); + dpa.setId("highTemperatureAlarmID"); + dpa.setAlarmType("highTemperatureAlarm"); + dpa.setCreateRules(new TreeMap<>(Collections.singletonMap(AlarmSeverity.CRITICAL, alarmRule))); + + AlarmConditionFilter lowTempFilter = new AlarmConditionFilter(); + lowTempFilter.setKey(new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, "temperature")); + lowTempFilter.setValueType(EntityKeyValueType.NUMERIC); + NumericFilterPredicate lowTemperaturePredicate = new NumericFilterPredicate(); + lowTemperaturePredicate.setOperation(NumericFilterPredicate.NumericOperation.LESS); + lowTemperaturePredicate.setValue(new FilterPredicateValue<>(10.0)); + lowTempFilter.setPredicate(lowTemperaturePredicate); + AlarmRule clearRule = new AlarmRule(); + AlarmCondition clearCondition = new AlarmCondition(); + clearCondition.setCondition(Collections.singletonList(lowTempFilter)); + clearRule.setCondition(clearCondition); + dpa.setClearRule(clearRule); + + deviceProfileData.setAlarms(Collections.singletonList(dpa)); + deviceProfile.setProfileData(deviceProfileData); + + Mockito.when(cache.get(tenantId, deviceId)).thenReturn(deviceProfile); + Mockito.when(timeseriesService.findLatest(tenantId, deviceId, Collections.singleton("temperature"))) + .thenReturn(Futures.immediateFuture(Collections.emptyList())); + Mockito.when(alarmService.findLatestActiveByOriginatorAndType(tenantId, deviceId, "highTemperatureAlarm")).thenReturn(null); + registerCreateAlarmMock(alarmService.createAlarm(any()), true); + + TbMsg theMsg = TbMsg.newMsg() + .type(TbMsgType.ALARM) + .originator(deviceId) + .copyMetaData(TbMsgMetaData.EMPTY) + .data(TbMsg.EMPTY_STRING) + .build(); + when(ctx.newMsg(any(), any(TbMsgType.class), any(), any(), any(), Mockito.anyString())).thenReturn(theMsg); + + ObjectNode data = JacksonUtil.newObjectNode(); + data.put("temperature", 42); + TbMsg msg = TbMsg.newMsg() + .type(TbMsgType.TIMESERIES_UPDATED) + .originator(deviceId) + .copyMetaData(TbMsgMetaData.EMPTY) + .dataType(TbMsgDataType.JSON) + .data(JacksonUtil.toString(data)) + .build(); + node.onMsg(ctx, msg); + verify(ctx).tellSuccess(msg); + verify(ctx).enqueueForTellNext(theMsg, "Alarm Created"); + verify(ctx, Mockito.never()).tellFailure(Mockito.any(), Mockito.any()); + } + @Override protected TbNode getTestNode() { return node; From b2452af6ce92e6c10bcbf98f7578c35372a970ba Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Wed, 21 May 2025 15:18:22 +0300 Subject: [PATCH 2/3] splitted logic for POST_TELEMETRY_REQUEST and TIMESERIES_UPDATED --- .../server/common/adaptor/JsonConverter.java | 4 +-- .../rule/engine/profile/DeviceState.java | 27 +++++++++++++++---- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java index f1575f8a1c..a504c287b8 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java @@ -60,6 +60,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.function.Consumer; @@ -590,8 +591,7 @@ public class JsonConverter { public static Map> convertToSortedTelemetry(JsonElement jsonElement, long systemTs) throws JsonSyntaxException { - JsonElement timeseriesElement = jsonElement.isJsonObject() ? jsonElement.getAsJsonObject().get("timeseries") : null; - return convertToTelemetry(timeseriesElement != null ? timeseriesElement : jsonElement, systemTs, true); + return convertToTelemetry(jsonElement, systemTs, true); } public static Map> convertToTelemetry(JsonElement jsonElement, long systemTs, boolean sorted) throws diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java index bbce5e5dfe..193c179df2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.profile; +import com.google.gson.JsonElement; import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.JacksonUtil; @@ -49,6 +50,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -149,8 +151,10 @@ class DeviceState { latestValues = fetchLatestValues(ctx, deviceId); } boolean stateChanged = false; - if (msg.isTypeOf(POST_TELEMETRY_REQUEST) || msg.isTypeOf(TIMESERIES_UPDATED)) { - stateChanged = processTelemetry(ctx, msg); + if (msg.isTypeOf(POST_TELEMETRY_REQUEST)) { + stateChanged = processTelemetryRequest(ctx, msg); + } else if (msg.isTypeOf(TIMESERIES_UPDATED)) { + stateChanged = processTelemetryUpdatedNotification(ctx, msg); } else if (msg.isTypeOf(POST_ATTRIBUTES_REQUEST)) { stateChanged = processAttributesUpdateRequest(ctx, msg); } else if (msg.isTypeOneOf(ACTIVITY_EVENT, INACTIVITY_EVENT)) { @@ -180,7 +184,7 @@ class DeviceState { private boolean processDeviceActivityEvent(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { String scope = msg.getMetaData().getValue(DataConstants.SCOPE); if (StringUtils.isEmpty(scope)) { - return processTelemetry(ctx, msg); + return processTelemetryRequest(ctx, msg); } else { return processAttributes(ctx, msg, scope); } @@ -267,9 +271,22 @@ class DeviceState { return stateChanged; } - protected boolean processTelemetry(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + protected boolean processTelemetryRequest(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + return processTelemetryUpdate(ctx, msg, JsonParser.parseString(msg.getData())); + } + + protected boolean processTelemetryUpdatedNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + JsonElement msgData = JsonParser.parseString(msg.getData()); + JsonElement telemetryData = Optional.ofNullable(JsonParser.parseString(msg.getData())) + .filter(JsonElement::isJsonObject) + .map(e -> e.getAsJsonObject().get("timeseries")) + .orElse(msgData); + return processTelemetryUpdate(ctx, msg, telemetryData); + } + + private boolean processTelemetryUpdate(TbContext ctx, TbMsg msg, JsonElement telemetryData) throws ExecutionException, InterruptedException { boolean stateChanged = false; - Map> tsKvMap = JsonConverter.convertToSortedTelemetry(JsonParser.parseString(msg.getData()), msg.getMetaDataTs()); + Map> tsKvMap = JsonConverter.convertToSortedTelemetry(telemetryData, msg.getMetaDataTs()); // iterate over data by ts (ASC order). for (Map.Entry> entry : tsKvMap.entrySet()) { Long ts = entry.getKey(); From 6e5e6b064743e5c1b94eb4eed921344eba8bfa74 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Wed, 21 May 2025 15:19:38 +0300 Subject: [PATCH 3/3] clean imports --- .../org/thingsboard/server/common/adaptor/JsonConverter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java index a504c287b8..2a208923d9 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java @@ -60,7 +60,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.function.Consumer;