From ae273114adce5dc04af29e884f8f4ddb8aa37362 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Fri, 12 Apr 2024 13:50:07 +0300 Subject: [PATCH 1/4] added tests for save timeseries node --- .../telemetry/TbMsgTimeseriesNodeTest.java | 169 ++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java new file mode 100644 index 0000000000..f3d2486027 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java @@ -0,0 +1,169 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.TenantProfileId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; +import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class TbMsgTimeseriesNodeTest { + + private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("e5095e9a-04f4-44c9-b443-1cf1b97d3384")); + private final TenantId TENANT_ID = new TenantId(UUID.fromString("c8f34868-603a-4433-876a-7d356e5cf377")); + + private TbMsgTimeseriesNode node; + private TbMsgTimeseriesNodeConfiguration config; + private long tenantProfileDefaultStorageTtl; + + @Mock + private TbContext ctxMock; + @Mock + private RuleEngineTelemetryService telemetryServiceMock; + + @BeforeEach + public void setUp() throws TbNodeException { + node = new TbMsgTimeseriesNode(); + config = new TbMsgTimeseriesNodeConfiguration().defaultConfiguration(); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + TenantProfile tenantProfile = new TenantProfile(new TenantProfileId(UUID.fromString("8c45d0fe-d437-40e9-8c31-b695b315bf40"))); + TenantProfileData tenantProfileData = new TenantProfileData(); + DefaultTenantProfileConfiguration tenantProfileConfiguration = new DefaultTenantProfileConfiguration(); + tenantProfileData.setConfiguration(tenantProfileConfiguration); + tenantProfile.setProfileData(tenantProfileData); + when(ctxMock.getTenantProfile()).thenReturn(tenantProfile); + doAnswer(invocation -> { + invocation.getArgument(0); + return null; + }).when(ctxMock).addTenantProfileListener(any()); + node.init(ctxMock, configuration); + tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(tenantProfileConfiguration.getDefaultStorageTtlDays()); + } + + @ParameterizedTest + @EnumSource(TbMsgType.class) + void givenUnsupportedMsgType_whenOnMsg_thenTellFailure(TbMsgType msgType) { + TbMsg msg = TbMsg.newMsg(msgType, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY); + + if (TbMsgType.POST_TELEMETRY_REQUEST.equals(msgType)) { + return; + } + + node.onMsg(ctxMock, msg); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctxMock).tellFailure(eq(msg), captor.capture()); + Throwable throwable = captor.getValue(); + assertThat(throwable).isInstanceOf(IllegalArgumentException.class); + assertThat(throwable.getMessage()).isEqualTo("Unsupported msg type: " + msgType); + } + + @Test + void givenEmptyMsgData_whenOnMsg_thenTellFailure() { + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY); + + node.onMsg(ctxMock, msg); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(ctxMock).tellFailure(eq(msg), captor.capture()); + Throwable throwable = captor.getValue(); + assertThat(throwable).isInstanceOf(IllegalArgumentException.class); + assertThat(throwable.getMessage()).isEqualTo("Msg body is empty: " + msg.getData()); + } + + @Test + void givenSkipLatestPersistenceIsFalse_whenOnMsg_thenSaveTimeseries() { + String data = """ + { + "temp": 45, + "humidity": 77 + } + """; + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + + when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock); + when(ctxMock.getTenantId()).thenReturn(TENANT_ID); + doAnswer(invocation -> { + TelemetryNodeCallback callback = invocation.getArgument(5); + callback.onSuccess(null); + return null; + }).when(telemetryServiceMock).saveAndNotify(any(), any(), any(), anyList(), anyLong(), any()); + + node.onMsg(ctxMock, msg); + + verify(telemetryServiceMock).saveAndNotify(eq(TENANT_ID), eq(null), eq(DEVICE_ID), anyList(), eq(tenantProfileDefaultStorageTtl), any(TelemetryNodeCallback.class)); + verify(ctxMock).tellSuccess(eq(msg)); + } + + @Test + void givenSkipLatestPersistenceIsTrue_whenOnMsg_thenSaveTimeseries() throws TbNodeException { + config.setSkipLatestPersistence(true); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + node.init(ctxMock, configuration); + + String data = """ + { + "temp": 45, + "humidity": 77 + } + """; + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + + when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock); + when(ctxMock.getTenantId()).thenReturn(TENANT_ID); + doAnswer(invocation -> { + TelemetryNodeCallback callback = invocation.getArgument(5); + callback.onSuccess(null); + return null; + }).when(telemetryServiceMock).saveWithoutLatestAndNotify(any(), any(), any(), anyList(), anyLong(), any()); + + node.onMsg(ctxMock, msg); + + verify(telemetryServiceMock).saveWithoutLatestAndNotify(eq(TENANT_ID), eq(null), eq(DEVICE_ID), anyList(), eq(tenantProfileDefaultStorageTtl), any(TelemetryNodeCallback.class)); + verify(ctxMock).tellSuccess(eq(msg)); + } +} From 6339f0ce633509bed1ae7805628e714c4c3deab5 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 28 May 2024 18:48:56 +0300 Subject: [PATCH 2/4] refactored tests --- .../telemetry/TbMsgTimeseriesNodeTest.java | 69 +++++++++++-------- 1 file changed, 41 insertions(+), 28 deletions(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java index f3d2486027..d4bdcae878 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.telemetry; +import com.fasterxml.jackson.databind.JsonNode; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -32,12 +33,14 @@ import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -46,15 +49,17 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class TbMsgTimeseriesNodeTest { + private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("c8f34868-603a-4433-876a-7d356e5cf377")); private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("e5095e9a-04f4-44c9-b443-1cf1b97d3384")); - private final TenantId TENANT_ID = new TenantId(UUID.fromString("c8f34868-603a-4433-876a-7d356e5cf377")); private TbMsgTimeseriesNode node; private TbMsgTimeseriesNodeConfiguration config; @@ -70,9 +75,9 @@ public class TbMsgTimeseriesNodeTest { node = new TbMsgTimeseriesNode(); config = new TbMsgTimeseriesNodeConfiguration().defaultConfiguration(); var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - TenantProfile tenantProfile = new TenantProfile(new TenantProfileId(UUID.fromString("8c45d0fe-d437-40e9-8c31-b695b315bf40"))); - TenantProfileData tenantProfileData = new TenantProfileData(); - DefaultTenantProfileConfiguration tenantProfileConfiguration = new DefaultTenantProfileConfiguration(); + var tenantProfile = new TenantProfile(new TenantProfileId(UUID.fromString("8c45d0fe-d437-40e9-8c31-b695b315bf40"))); + var tenantProfileData = new TenantProfileData(); + var tenantProfileConfiguration = new DefaultTenantProfileConfiguration(); tenantProfileData.setConfiguration(tenantProfileConfiguration); tenantProfile.setProfileData(tenantProfileData); when(ctxMock.getTenantProfile()).thenReturn(tenantProfile); @@ -86,33 +91,18 @@ public class TbMsgTimeseriesNodeTest { @ParameterizedTest @EnumSource(TbMsgType.class) - void givenUnsupportedMsgType_whenOnMsg_thenTellFailure(TbMsgType msgType) { + void givenMsgTypeAndEmptyMsgData_whenOnMsg_thenVerifyFailureMsg(TbMsgType msgType) { TbMsg msg = TbMsg.newMsg(msgType, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY); + node.onMsg(ctxMock, msg); + ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(ctxMock).tellFailure(eq(msg), throwableCaptor.capture()); if (TbMsgType.POST_TELEMETRY_REQUEST.equals(msgType)) { + assertThat(throwableCaptor.getValue()).isInstanceOf(IllegalArgumentException.class).hasMessage("Msg body is empty: " + msg.getData()); return; } - - node.onMsg(ctxMock, msg); - - ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); - verify(ctxMock).tellFailure(eq(msg), captor.capture()); - Throwable throwable = captor.getValue(); - assertThat(throwable).isInstanceOf(IllegalArgumentException.class); - assertThat(throwable.getMessage()).isEqualTo("Unsupported msg type: " + msgType); - } - - @Test - void givenEmptyMsgData_whenOnMsg_thenTellFailure() { - TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY); - - node.onMsg(ctxMock, msg); - - ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); - verify(ctxMock).tellFailure(eq(msg), captor.capture()); - Throwable throwable = captor.getValue(); - assertThat(throwable).isInstanceOf(IllegalArgumentException.class); - assertThat(throwable.getMessage()).isEqualTo("Msg body is empty: " + msg.getData()); + assertThat(throwableCaptor.getValue()).isInstanceOf(IllegalArgumentException.class).hasMessage("Unsupported msg type: " + msgType); + verifyNoMoreInteractions(ctxMock); } @Test @@ -135,8 +125,14 @@ public class TbMsgTimeseriesNodeTest { node.onMsg(ctxMock, msg); - verify(telemetryServiceMock).saveAndNotify(eq(TENANT_ID), eq(null), eq(DEVICE_ID), anyList(), eq(tenantProfileDefaultStorageTtl), any(TelemetryNodeCallback.class)); + ArgumentCaptor> entryListCaptor = ArgumentCaptor.forClass(List.class); + verify(telemetryServiceMock).saveAndNotify( + eq(TENANT_ID), isNull(), eq(DEVICE_ID), entryListCaptor.capture(), eq(tenantProfileDefaultStorageTtl), any(TelemetryNodeCallback.class)); + List entryListCaptorValue = entryListCaptor.getValue(); + assertThat(entryListCaptorValue.size()).isEqualTo(2); + verifyTimeseriesToSave(entryListCaptorValue, msg); verify(ctxMock).tellSuccess(eq(msg)); + verifyNoMoreInteractions(ctxMock, telemetryServiceMock); } @Test @@ -163,7 +159,24 @@ public class TbMsgTimeseriesNodeTest { node.onMsg(ctxMock, msg); - verify(telemetryServiceMock).saveWithoutLatestAndNotify(eq(TENANT_ID), eq(null), eq(DEVICE_ID), anyList(), eq(tenantProfileDefaultStorageTtl), any(TelemetryNodeCallback.class)); + ArgumentCaptor> entryListCaptor = ArgumentCaptor.forClass(List.class); + verify(telemetryServiceMock).saveWithoutLatestAndNotify( + eq(TENANT_ID), isNull(), eq(DEVICE_ID), entryListCaptor.capture(), eq(tenantProfileDefaultStorageTtl), any(TelemetryNodeCallback.class)); + List entryListCaptorValue = entryListCaptor.getValue(); + assertThat(entryListCaptorValue.size()).isEqualTo(2); + verifyTimeseriesToSave(entryListCaptorValue, msg); verify(ctxMock).tellSuccess(eq(msg)); + verifyNoMoreInteractions(ctxMock, telemetryServiceMock); } + + private void verifyTimeseriesToSave(List tsKvEntryList, TbMsg incomingMsg) { + JsonNode msgData = JacksonUtil.toJsonNode(incomingMsg.getData()); + tsKvEntryList.forEach(tsKvEntry -> { + String key = tsKvEntry.getKey(); + assertThat(msgData.has(key)).isTrue(); + String value = tsKvEntry.getValueAsString(); + assertThat(value).isEqualTo(msgData.findValue(key).asText()); + }); + } + } From 7ed5b3c1e18d5133f4b44ba4011badf98bf2dc46 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 5 Jun 2024 18:38:50 +0300 Subject: [PATCH 3/4] added tests for ttl usage --- .../telemetry/TbMsgTimeseriesNodeTest.java | 106 ++++++++++++++---- 1 file changed, 83 insertions(+), 23 deletions(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java index d4bdcae878..445f09b5ee 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java @@ -20,7 +20,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -40,9 +42,12 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -60,6 +65,7 @@ public class TbMsgTimeseriesNodeTest { private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("c8f34868-603a-4433-876a-7d356e5cf377")); private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("e5095e9a-04f4-44c9-b443-1cf1b97d3384")); + private final TenantProfileId TENANT_PROFILE_ID = new TenantProfileId(UUID.fromString("ab78dd78-83d0-43fa-869f-d42ec9ed1744")); private TbMsgTimeseriesNode node; private TbMsgTimeseriesNodeConfiguration config; @@ -74,24 +80,12 @@ public class TbMsgTimeseriesNodeTest { public void setUp() throws TbNodeException { node = new TbMsgTimeseriesNode(); config = new TbMsgTimeseriesNodeConfiguration().defaultConfiguration(); - var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - var tenantProfile = new TenantProfile(new TenantProfileId(UUID.fromString("8c45d0fe-d437-40e9-8c31-b695b315bf40"))); - var tenantProfileData = new TenantProfileData(); - var tenantProfileConfiguration = new DefaultTenantProfileConfiguration(); - tenantProfileData.setConfiguration(tenantProfileConfiguration); - tenantProfile.setProfileData(tenantProfileData); - when(ctxMock.getTenantProfile()).thenReturn(tenantProfile); - doAnswer(invocation -> { - invocation.getArgument(0); - return null; - }).when(ctxMock).addTenantProfileListener(any()); - node.init(ctxMock, configuration); - tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(tenantProfileConfiguration.getDefaultStorageTtlDays()); } @ParameterizedTest @EnumSource(TbMsgType.class) - void givenMsgTypeAndEmptyMsgData_whenOnMsg_thenVerifyFailureMsg(TbMsgType msgType) { + void givenMsgTypeAndEmptyMsgData_whenOnMsg_thenVerifyFailureMsg(TbMsgType msgType) throws TbNodeException { + init(); TbMsg msg = TbMsg.newMsg(msgType, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY); node.onMsg(ctxMock, msg); ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class); @@ -106,7 +100,9 @@ public class TbMsgTimeseriesNodeTest { } @Test - void givenSkipLatestPersistenceIsFalse_whenOnMsg_thenSaveTimeseries() { + void givenTtlFromConfigIsZeroAndUseServiceTsIsTrue_whenOnMsg_thenSaveTimeseriesUsingTenantProfileDefaultTtl() throws TbNodeException { + config.setUseServerTs(true); + init(); String data = """ { "temp": 45, @@ -126,19 +122,23 @@ public class TbMsgTimeseriesNodeTest { node.onMsg(ctxMock, msg); ArgumentCaptor> entryListCaptor = ArgumentCaptor.forClass(List.class); - verify(telemetryServiceMock).saveAndNotify( - eq(TENANT_ID), isNull(), eq(DEVICE_ID), entryListCaptor.capture(), eq(tenantProfileDefaultStorageTtl), any(TelemetryNodeCallback.class)); + verify(telemetryServiceMock).saveAndNotify(eq(TENANT_ID), isNull(), eq(DEVICE_ID), entryListCaptor.capture(), + eq(tenantProfileDefaultStorageTtl), any(TelemetryNodeCallback.class)); List entryListCaptorValue = entryListCaptor.getValue(); assertThat(entryListCaptorValue.size()).isEqualTo(2); verifyTimeseriesToSave(entryListCaptorValue, msg); - verify(ctxMock).tellSuccess(eq(msg)); + verify(ctxMock).tellSuccess(msg); verifyNoMoreInteractions(ctxMock, telemetryServiceMock); } @Test - void givenSkipLatestPersistenceIsTrue_whenOnMsg_thenSaveTimeseries() throws TbNodeException { + void givenSkipLatestPersistenceIsTrueAndTtlFromConfig_whenOnMsg_thenSaveTimeseriesUsingTtlFromConfig() throws TbNodeException { + long ttlFromConfig = 5L; + config.setDefaultTTL(ttlFromConfig); config.setSkipLatestPersistence(true); var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + var tenantProfile = getTenantProfile(); + when(ctxMock.getTenantProfile()).thenReturn(tenantProfile); node.init(ctxMock, configuration); String data = """ @@ -147,7 +147,9 @@ public class TbMsgTimeseriesNodeTest { "humidity": 77 } """; - TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + long ts = System.currentTimeMillis(); + var metadata = Map.of("ts", String.valueOf(ts)); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, new TbMsgMetaData(metadata), data); when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock); when(ctxMock.getTenantId()).thenReturn(TENANT_ID); @@ -159,19 +161,58 @@ public class TbMsgTimeseriesNodeTest { node.onMsg(ctxMock, msg); + verify(ctxMock).addTenantProfileListener(any()); ArgumentCaptor> entryListCaptor = ArgumentCaptor.forClass(List.class); verify(telemetryServiceMock).saveWithoutLatestAndNotify( - eq(TENANT_ID), isNull(), eq(DEVICE_ID), entryListCaptor.capture(), eq(tenantProfileDefaultStorageTtl), any(TelemetryNodeCallback.class)); + eq(TENANT_ID), isNull(), eq(DEVICE_ID), entryListCaptor.capture(), eq(ttlFromConfig), any(TelemetryNodeCallback.class)); List entryListCaptorValue = entryListCaptor.getValue(); assertThat(entryListCaptorValue.size()).isEqualTo(2); - verifyTimeseriesToSave(entryListCaptorValue, msg); - verify(ctxMock).tellSuccess(eq(msg)); + verifyTimeseriesToSave(entryListCaptorValue, msg, ts); + verify(ctxMock).tellSuccess(msg); verifyNoMoreInteractions(ctxMock, telemetryServiceMock); } + @ParameterizedTest + @MethodSource + void givenTtlFromConfigAndTtlFromMd_whenOnMsg_thenVerifyTtl(String ttlFromMd, long ttlFromConfig, long expectedTtl) throws TbNodeException { + config.setDefaultTTL(ttlFromConfig); + init(); + + when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock); + when(ctxMock.getTenantId()).thenReturn(TENANT_ID); + + String data = """ + { + "temp": 45, + "humidity": 77 + } + """; + var metadata = new HashMap(); + metadata.put("TTL", ttlFromMd); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, new TbMsgMetaData(metadata), data); + node.onMsg(ctxMock, msg); + + verify(telemetryServiceMock).saveAndNotify(eq(TENANT_ID), isNull(), eq(DEVICE_ID), anyList(), eq(expectedTtl), any(TelemetryNodeCallback.class)); + } + + private static Stream givenTtlFromConfigAndTtlFromMd_whenOnMsg_thenVerifyTtl() { + return Stream.of( + Arguments.of("5", 1L, 5L), + Arguments.of("", 3L, 3L), + Arguments.of(null, 8L, 8L) + ); + } + private void verifyTimeseriesToSave(List tsKvEntryList, TbMsg incomingMsg) { + verifyTimeseriesToSave(tsKvEntryList, incomingMsg, null); + } + + private void verifyTimeseriesToSave(List tsKvEntryList, TbMsg incomingMsg, Long ts) { JsonNode msgData = JacksonUtil.toJsonNode(incomingMsg.getData()); tsKvEntryList.forEach(tsKvEntry -> { + if (ts != null) { + assertThat(tsKvEntry.getTs()).isEqualTo(ts); + } String key = tsKvEntry.getKey(); assertThat(msgData.has(key)).isTrue(); String value = tsKvEntry.getValueAsString(); @@ -179,4 +220,23 @@ public class TbMsgTimeseriesNodeTest { }); } + private void init() throws TbNodeException { + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + var tenantProfile = getTenantProfile(); + when(ctxMock.getTenantProfile()).thenReturn(tenantProfile); + node.init(ctxMock, configuration); + tenantProfile.getProfileConfiguration().ifPresent(profileConfiguration -> + tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(profileConfiguration.getDefaultStorageTtlDays())); + verify(ctxMock).addTenantProfileListener(any()); + } + + private TenantProfile getTenantProfile() { + var tenantProfile = new TenantProfile(TENANT_PROFILE_ID); + var tenantProfileData = new TenantProfileData(); + var tenantProfileConfiguration = new DefaultTenantProfileConfiguration(); + tenantProfileData.setConfiguration(tenantProfileConfiguration); + tenantProfile.setProfileData(tenantProfileData); + return tenantProfile; + } + } From 4fc69299222c487e890faa8e875568fb3fb68f4b Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Fri, 7 Jun 2024 10:47:27 +0300 Subject: [PATCH 4/4] added test for default config and set different values for check logic of setting ttl --- .../telemetry/TbMsgTimeseriesNodeTest.java | 89 ++++++++++--------- 1 file changed, 48 insertions(+), 41 deletions(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java index 445f09b5ee..1fb4e9eefd 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java @@ -15,7 +15,7 @@ */ package org.thingsboard.rule.engine.telemetry; -import com.fasterxml.jackson.databind.JsonNode; +import com.google.gson.JsonParser; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -31,10 +31,13 @@ import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; @@ -42,14 +45,14 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; @@ -82,17 +85,26 @@ public class TbMsgTimeseriesNodeTest { config = new TbMsgTimeseriesNodeConfiguration().defaultConfiguration(); } + @Test + public void verifyDefaultConfig() { + assertThat(config.getDefaultTTL()).isEqualTo(0L); + assertThat(config.isSkipLatestPersistence()).isFalse(); + assertThat(config.isUseServerTs()).isFalse(); + } + @ParameterizedTest @EnumSource(TbMsgType.class) - void givenMsgTypeAndEmptyMsgData_whenOnMsg_thenVerifyFailureMsg(TbMsgType msgType) throws TbNodeException { + public void givenMsgTypeAndEmptyMsgData_whenOnMsg_thenVerifyFailureMsg(TbMsgType msgType) throws TbNodeException { init(); TbMsg msg = TbMsg.newMsg(msgType, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_ARRAY); node.onMsg(ctxMock, msg); + ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class); verify(ctxMock).tellFailure(eq(msg), throwableCaptor.capture()); if (TbMsgType.POST_TELEMETRY_REQUEST.equals(msgType)) { assertThat(throwableCaptor.getValue()).isInstanceOf(IllegalArgumentException.class).hasMessage("Msg body is empty: " + msg.getData()); + verifyNoMoreInteractions(ctxMock); return; } assertThat(throwableCaptor.getValue()).isInstanceOf(IllegalArgumentException.class).hasMessage("Unsupported msg type: " + msgType); @@ -100,9 +112,10 @@ public class TbMsgTimeseriesNodeTest { } @Test - void givenTtlFromConfigIsZeroAndUseServiceTsIsTrue_whenOnMsg_thenSaveTimeseriesUsingTenantProfileDefaultTtl() throws TbNodeException { + public void givenTtlFromConfigIsZeroAndUseServiceTsIsTrue_whenOnMsg_thenSaveTimeseriesUsingTenantProfileDefaultTtl() throws TbNodeException { config.setUseServerTs(true); init(); + String data = """ { "temp": 45, @@ -121,25 +134,22 @@ public class TbMsgTimeseriesNodeTest { node.onMsg(ctxMock, msg); + List expectedList = getTsKvEntriesListWithTs(data, System.currentTimeMillis()); ArgumentCaptor> entryListCaptor = ArgumentCaptor.forClass(List.class); verify(telemetryServiceMock).saveAndNotify(eq(TENANT_ID), isNull(), eq(DEVICE_ID), entryListCaptor.capture(), eq(tenantProfileDefaultStorageTtl), any(TelemetryNodeCallback.class)); - List entryListCaptorValue = entryListCaptor.getValue(); - assertThat(entryListCaptorValue.size()).isEqualTo(2); - verifyTimeseriesToSave(entryListCaptorValue, msg); + assertThat(entryListCaptor.getValue()).usingRecursiveFieldByFieldElementComparatorIgnoringFields("ts") + .containsExactlyElementsOf(expectedList); verify(ctxMock).tellSuccess(msg); verifyNoMoreInteractions(ctxMock, telemetryServiceMock); } @Test - void givenSkipLatestPersistenceIsTrueAndTtlFromConfig_whenOnMsg_thenSaveTimeseriesUsingTtlFromConfig() throws TbNodeException { + public void givenSkipLatestPersistenceIsTrueAndTtlFromConfig_whenOnMsg_thenSaveTimeseriesUsingTtlFromConfig() throws TbNodeException { long ttlFromConfig = 5L; config.setDefaultTTL(ttlFromConfig); config.setSkipLatestPersistence(true); - var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - var tenantProfile = getTenantProfile(); - when(ctxMock.getTenantProfile()).thenReturn(tenantProfile); - node.init(ctxMock, configuration); + init(); String data = """ { @@ -161,20 +171,18 @@ public class TbMsgTimeseriesNodeTest { node.onMsg(ctxMock, msg); - verify(ctxMock).addTenantProfileListener(any()); + List expectedList = getTsKvEntriesListWithTs(data, ts); ArgumentCaptor> entryListCaptor = ArgumentCaptor.forClass(List.class); verify(telemetryServiceMock).saveWithoutLatestAndNotify( eq(TENANT_ID), isNull(), eq(DEVICE_ID), entryListCaptor.capture(), eq(ttlFromConfig), any(TelemetryNodeCallback.class)); - List entryListCaptorValue = entryListCaptor.getValue(); - assertThat(entryListCaptorValue.size()).isEqualTo(2); - verifyTimeseriesToSave(entryListCaptorValue, msg, ts); + assertThat(entryListCaptor.getValue()).containsExactlyElementsOf(expectedList); verify(ctxMock).tellSuccess(msg); verifyNoMoreInteractions(ctxMock, telemetryServiceMock); } @ParameterizedTest @MethodSource - void givenTtlFromConfigAndTtlFromMd_whenOnMsg_thenVerifyTtl(String ttlFromMd, long ttlFromConfig, long expectedTtl) throws TbNodeException { + public void givenTtlFromConfigAndTtlFromMd_whenOnMsg_thenVerifyTtl(String ttlFromMd, long ttlFromConfig, long expectedTtl) throws TbNodeException { config.setDefaultTTL(ttlFromConfig); init(); @@ -187,9 +195,9 @@ public class TbMsgTimeseriesNodeTest { "humidity": 77 } """; - var metadata = new HashMap(); - metadata.put("TTL", ttlFromMd); - TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, new TbMsgMetaData(metadata), data); + var metadata = new TbMsgMetaData(); + metadata.putValue("TTL", ttlFromMd); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metadata, data); node.onMsg(ctxMock, msg); verify(telemetryServiceMock).saveAndNotify(eq(TENANT_ID), isNull(), eq(DEVICE_ID), anyList(), eq(expectedTtl), any(TelemetryNodeCallback.class)); @@ -197,36 +205,23 @@ public class TbMsgTimeseriesNodeTest { private static Stream givenTtlFromConfigAndTtlFromMd_whenOnMsg_thenVerifyTtl() { return Stream.of( - Arguments.of("5", 1L, 5L), + // when ttl is present in metadata and it is not zero then ttl = ttl from metadata + Arguments.of("1", 2L, 1L), + // when ttl is absent in metadata and present in config and it is not zero then ttl = ttl from config Arguments.of("", 3L, 3L), - Arguments.of(null, 8L, 8L) + Arguments.of(null, 4L, 4L), + // when ttl is present in metadata or config but it is zero then ttl = default ttl from tenant profile + Arguments.of("0", 0L, TimeUnit.DAYS.toSeconds(5L)) ); } - private void verifyTimeseriesToSave(List tsKvEntryList, TbMsg incomingMsg) { - verifyTimeseriesToSave(tsKvEntryList, incomingMsg, null); - } - - private void verifyTimeseriesToSave(List tsKvEntryList, TbMsg incomingMsg, Long ts) { - JsonNode msgData = JacksonUtil.toJsonNode(incomingMsg.getData()); - tsKvEntryList.forEach(tsKvEntry -> { - if (ts != null) { - assertThat(tsKvEntry.getTs()).isEqualTo(ts); - } - String key = tsKvEntry.getKey(); - assertThat(msgData.has(key)).isTrue(); - String value = tsKvEntry.getValueAsString(); - assertThat(value).isEqualTo(msgData.findValue(key).asText()); - }); - } - private void init() throws TbNodeException { var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); var tenantProfile = getTenantProfile(); when(ctxMock.getTenantProfile()).thenReturn(tenantProfile); - node.init(ctxMock, configuration); tenantProfile.getProfileConfiguration().ifPresent(profileConfiguration -> tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(profileConfiguration.getDefaultStorageTtlDays())); + node.init(ctxMock, configuration); verify(ctxMock).addTenantProfileListener(any()); } @@ -234,9 +229,21 @@ public class TbMsgTimeseriesNodeTest { var tenantProfile = new TenantProfile(TENANT_PROFILE_ID); var tenantProfileData = new TenantProfileData(); var tenantProfileConfiguration = new DefaultTenantProfileConfiguration(); + tenantProfileConfiguration.setDefaultStorageTtlDays(5); tenantProfileData.setConfiguration(tenantProfileConfiguration); tenantProfile.setProfileData(tenantProfileData); return tenantProfile; } + private static List getTsKvEntriesListWithTs(String data, long ts) { + Map> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(data), ts); + List expectedList = new ArrayList<>(); + for (Map.Entry> tsKvEntry : tsKvMap.entrySet()) { + for (KvEntry kvEntry : tsKvEntry.getValue()) { + expectedList.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry)); + } + } + return expectedList; + } + }