From 5c6cd2dda89bc2d4b3b2f83a9bbff6f4f26bfae8 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 5 Nov 2024 20:18:15 +0100 Subject: [PATCH] added new debug strategy ALL_THEN_ONLY_FAILURE_EVENTS --- .../actors/ruleChain/DefaultTbContext.java | 14 +- .../RuleChainActorMessageProcessor.java | 2 +- .../actors/rule/DefaultTbContextTest.java | 192 +++++++++++++++++- .../common/data/rule/DebugStrategy.java | 28 +-- common/edge-api/src/main/proto/edge.proto | 3 +- 5 files changed, 212 insertions(+), 27 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index bd57f7da38..119d1d2900 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -1005,19 +1005,19 @@ public class DefaultTbContext implements TbContext { DebugStrategy debugStrategy = ruleNode.getDebugStrategy(); if (debugStrategy.shouldPersistDebugOutputForAllEvents(ruleNode.getLastUpdateTs(), msg.getTs(), getMaxRuleNodeDebugDurationMinutes())) { relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(getTenantId(), ruleNode.getId(), msg, relationType, error, failureMessage)); - } else if (debugStrategy.shouldPersistDebugForFailureEventOnly(relationTypes)) { + } else if (debugStrategy.shouldPersistDebugForFailureEvent(relationTypes)) { mainCtx.persistDebugOutput(getTenantId(), ruleNode.getId(), msg, TbNodeConnectionType.FAILURE, error, failureMessage); } } private int getMaxRuleNodeDebugDurationMinutes() { - if (!DebugStrategy.ALL_EVENTS.equals(nodeCtx.getSelf().getDebugStrategy())) { - return 0; + if (nodeCtx.getSelf().getDebugStrategy().isHasDuration()) { + var configuration = mainCtx.getTenantProfileCache() + .get(getTenantId()).getProfileData().getConfiguration(); + int systemMaxRuleNodeDebugModeDurationMinutes = mainCtx.getMaxRuleNodeDebugModeDurationMinutes(); + return configuration.getMaxRuleNodeDebugModeDurationMinutes(systemMaxRuleNodeDebugModeDurationMinutes); } - var configuration = mainCtx.getTenantProfileCache() - .get(getTenantId()).getProfileData().getConfiguration(); - int systemMaxRuleNodeDebugModeDurationMinutes = mainCtx.getMaxRuleNodeDebugModeDurationMinutes(); - return configuration.getMaxRuleNodeDebugModeDurationMinutes(systemMaxRuleNodeDebugModeDurationMinutes); + return 0; } } diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index 2213daa770..01caa50860 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -260,7 +260,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor givenDebugStrategyAllThenOnlyFailureEventsAndConnection_whenTellNext_thenVerifyDebugOutputPersisted() { + return failureAndSuccessConnection(); + } + + @Test + public void givenDebugStrategyAllThenOnlyEventsAndFailureAndSuccessConnection_whenTellNext_thenVerifyDebugOutputPersistedForAllEvents() { + // GIVEN + var callbackMock = mock(TbMsgCallback.class); + var msg = getTbMsgWithCallback(callbackMock); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setLastUpdateTs(System.currentTimeMillis()); + ruleNode.setDebugStrategy(DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + mockGetMaxRuleNodeDebugModeDurationMinutes(); + + // WHEN + Set connections = failureAndSuccessConnection().collect(Collectors.toSet()); + defaultTbContext.tellNext(msg, connections); + + // THEN + then(nodeCtxMock).should().getChainActor(); + then(nodeCtxMock).shouldHaveNoMoreInteractions(); + then(mainCtxMock).should().getTenantProfileCache(); + then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes(); + var nodeConnectionsCaptor = ArgumentCaptor.forClass(String.class); + int wantedNumberOfInvocations = connections.size(); + then(mainCtxMock).should(times(wantedNumberOfInvocations)).persistDebugOutput(eq(TENANT_ID), eq(RULE_NODE_ID), eq(msg), nodeConnectionsCaptor.capture(), nullable(Throwable.class), nullable(String.class)); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + assertThat(nodeConnectionsCaptor.getAllValues()).hasSize(wantedNumberOfInvocations); + assertThat(nodeConnectionsCaptor.getAllValues()).containsExactlyInAnyOrderElementsOf(connections); + checkTellNextCommonLogic(callbackMock, connections, msg); + } + private static Stream failureAndSuccessConnection() { return Stream.of(TbNodeConnectionType.FAILURE, TbNodeConnectionType.SUCCESS); } @@ -361,6 +427,32 @@ class DefaultTbContextTest { then(nodeCtxMock).shouldHaveNoMoreInteractions(); } + @ParameterizedTest + @ValueSource(strings = {TbNodeConnectionType.SUCCESS, TbNodeConnectionType.FAILURE}) + void givenDebugStrategyAllThenOnlyFailureEvents_whenOutput_thenVerifyDebugOutputPersisted(String nodeConnection) { + // GIVEN + var msgMock = mock(TbMsg.class); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.ALL_EVENTS); + given(msgMock.popFormStack()).willReturn(new TbMsgProcessingStackItem(RULE_CHAIN_ID, RULE_NODE_ID)); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + mockGetMaxRuleNodeDebugModeDurationMinutes(); + + // WHEN + defaultTbContext.output(msgMock, nodeConnection); + + // THEN + checkOutputCommonLogic(msgMock, nodeConnection); + then(mainCtxMock).should().getTenantProfileCache(); + then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes(); + then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msgMock, nodeConnection, null, null); + then(mainCtxMock).shouldHaveNoMoreInteractions(); + then(nodeCtxMock).shouldHaveNoMoreInteractions(); + } + @Test public void givenEmptyStack_whenOutput_thenVerifyMsgAck() { // GIVEN @@ -409,6 +501,32 @@ class DefaultTbContextTest { then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msgMock, TbNodeConnectionType.ACK, null, null); } + @Test + public void givenEmptyStackAndDebugStrategyAllThenOnlyFailureEvents_whenOutput_thenVerifyMsgAckAndDebugOutputPersisted() { + // GIVEN + var msgMock = mock(TbMsg.class); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setDebugStrategy(DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS); + ruleNode.setLastUpdateTs(System.currentTimeMillis()); + given(msgMock.popFormStack()).willReturn(null); + TbMsgCallback callbackMock = mock(TbMsgCallback.class); + given(msgMock.getCallback()).willReturn(callbackMock); + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + mockGetMaxRuleNodeDebugModeDurationMinutes(); + + // WHEN + defaultTbContext.output(msgMock, TbNodeConnectionType.SUCCESS); + + // THEN + then(msgMock).should().popFormStack(); + then(callbackMock).should().onProcessingEnd(RULE_NODE_ID); + then(callbackMock).should().onSuccess(); + then(nodeCtxMock).should(never()).getChainActor(); + then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msgMock, TbNodeConnectionType.ACK, null, null); + } + @Test public void givenDebugStrategyOnlyFailureEvents_whenEnqueueForTellFailure_thenVerifyDebugOutputPersisted() { // GIVEN @@ -551,7 +669,7 @@ class DefaultTbContextTest { given(nodeCtxMock.getSelf()).willReturn(ruleNode); given(mainCtxMock.resolve(any(ServiceType.class), anyString(), any(TenantId.class), any(EntityId.class))).willReturn(tpi); given(mainCtxMock.getClusterService()).willReturn(tbClusterServiceMock); - if (DebugStrategy.ALL_EVENTS.equals(debugStrategy)) { + if (DebugStrategy.ALL_EVENTS.equals(debugStrategy) || DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS.equals(debugStrategy)) { mockGetMaxRuleNodeDebugModeDurationMinutes(); } @@ -579,7 +697,7 @@ class DefaultTbContextTest { assertThat(simpleTbQueueCallback).isNotNull(); simpleTbQueueCallback.onSuccess(null); - if (DebugStrategy.ALL_EVENTS.equals(debugStrategy)) { + if (DebugStrategy.ALL_EVENTS.equals(debugStrategy) || DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS.equals(debugStrategy)) { then(mainCtxMock).should().getTenantProfileCache(); then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes(); ArgumentCaptor tbMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); @@ -611,7 +729,7 @@ class DefaultTbContextTest { given(nodeCtxMock.getSelf()).willReturn(ruleNode); given(mainCtxMock.resolve(any(ServiceType.class), anyString(), any(TenantId.class), any(EntityId.class))).willReturn(tpi); given(mainCtxMock.getClusterService()).willReturn(tbClusterServiceMock); - if (DebugStrategy.ALL_EVENTS.equals(debugStrategy)) { + if (DebugStrategy.ALL_EVENTS.equals(debugStrategy) || DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS.equals(debugStrategy)) { mockGetMaxRuleNodeDebugModeDurationMinutes(); } @@ -642,7 +760,7 @@ class DefaultTbContextTest { assertThat(simpleTbQueueCallback).isNotNull(); simpleTbQueueCallback.onSuccess(null); - if (DebugStrategy.ALL_EVENTS.equals(debugStrategy)) { + if (debugStrategy.isHasDuration()) { then(mainCtxMock).should().getTenantProfileCache(); then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes(); then(mainCtxMock).should().persistDebugOutput(eq(TENANT_ID), eq(RULE_NODE_ID), eq(msg), eq(TbNodeConnectionType.TO_ROOT_RULE_CHAIN), nullable(Throwable.class), nullable(String.class)); @@ -744,6 +862,51 @@ class DefaultTbContextTest { then(nodeCtxMock).shouldHaveNoMoreInteractions(); } + @MethodSource + @ParameterizedTest + void givenDebugStrategyAndConnectionAndPersistedResultOptions_whenTellNext_thenVerifyDebugOutputPersistence(DebugStrategy debugStrategy, + String connection, + boolean shouldPersist, + boolean shouldPersistAfterDurationTime) { + // GIVEN + var callbackMock = mock(TbMsgCallback.class); + var msg = getTbMsgWithCallback(callbackMock); + var ruleNode = new RuleNode(RULE_NODE_ID); + ruleNode.setRuleChainId(RULE_CHAIN_ID); + ruleNode.setLastUpdateTs(System.currentTimeMillis()); + ruleNode.setDebugStrategy(debugStrategy); + if (shouldPersist) { + given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID); + } + given(nodeCtxMock.getSelf()).willReturn(ruleNode); + given(nodeCtxMock.getChainActor()).willReturn(chainActorMock); + if (debugStrategy.isHasDuration()) { + mockGetMaxRuleNodeDebugModeDurationMinutes(); + } + + // WHEN + defaultTbContext.tellNext(msg, connection); + + // THEN + if (shouldPersist) { + then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msg, connection, null, null); + } + + // GIVEN + Mockito.clearInvocations(mainCtxMock); + if (debugStrategy.isHasDuration()) { + mockGetMaxRuleNodeDebugModeDurationMinutes(0); + } + + // WHEN + defaultTbContext.tellNext(msg, connection); + + // THEN + if (shouldPersistAfterDurationTime) { + then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msg, connection, null, null); + } + } + private void checkTellNextCommonLogic(TbMsgCallback callbackMock, String nodeConnection, TbMsg msg) { checkTellNextCommonLogic(callbackMock, Collections.singleton(nodeConnection), msg); } @@ -796,6 +959,7 @@ class DefaultTbContextTest { private static Stream givenDebugStrategyOptions_whenEnqueueForTellNext_thenVerifyDebugOutputPersistedOnlyForAllEventsDebugStrategy() { return Stream.of( Arguments.of(DebugStrategy.ALL_EVENTS, TbNodeConnectionType.OTHER), + Arguments.of(DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS, TbNodeConnectionType.OTHER), Arguments.of(DebugStrategy.ONLY_FAILURE_EVENTS, TbNodeConnectionType.TRUE), Arguments.of(DebugStrategy.DISABLED, TbNodeConnectionType.FALSE) ); @@ -804,11 +968,25 @@ class DefaultTbContextTest { private static Stream givenDebugStrategyOptions_whenEnqueue_thenVerifyDebugOutputPersistedOnlyForAllEventsDebugStrategy() { return Stream.of( Arguments.of(DebugStrategy.ALL_EVENTS), + Arguments.of(DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS), Arguments.of(DebugStrategy.ONLY_FAILURE_EVENTS), Arguments.of(DebugStrategy.DISABLED) ); } + private static Stream givenDebugStrategyAndConnectionAndPersistedResultOptions_whenTellNext_thenVerifyDebugOutputPersistence() { + return Stream.of( + Arguments.of(DebugStrategy.ALL_EVENTS, TbNodeConnectionType.SUCCESS, true, false), + Arguments.of(DebugStrategy.ALL_EVENTS, TbNodeConnectionType.FAILURE, true, false), + Arguments.of(DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS, TbNodeConnectionType.SUCCESS, true, false), + Arguments.of(DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS, TbNodeConnectionType.FAILURE, true, true), + Arguments.of(DebugStrategy.ONLY_FAILURE_EVENTS, TbNodeConnectionType.SUCCESS, false, false), + Arguments.of(DebugStrategy.ONLY_FAILURE_EVENTS, TbNodeConnectionType.FAILURE, true, true), + Arguments.of(DebugStrategy.DISABLED, TbNodeConnectionType.SUCCESS, false, false), + Arguments.of(DebugStrategy.DISABLED, TbNodeConnectionType.FAILURE, false, false) + ); + } + private TbMsg getTbMsgWithCallback(TbMsgCallback callback) { return TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, TENANT_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING, callback); } @@ -822,6 +1000,10 @@ class DefaultTbContextTest { } private void mockGetMaxRuleNodeDebugModeDurationMinutes() { + mockGetMaxRuleNodeDebugModeDurationMinutes(15); + } + + private void mockGetMaxRuleNodeDebugModeDurationMinutes(int maxRuleNodeDebugModeDurationMinutes) { var tbTenantProfileCacheMock = mock(TbTenantProfileCache.class); var tenantProfileMock = mock(TenantProfile.class); var tenantProfileDataMock = mock(TenantProfileData.class); @@ -831,7 +1013,7 @@ class DefaultTbContextTest { given(tbTenantProfileCacheMock.get(TENANT_ID)).willReturn(tenantProfileMock); given(tenantProfileMock.getProfileData()).willReturn(tenantProfileDataMock); given(tenantProfileDataMock.getConfiguration()).willReturn(tenantProfileConfigurationMock); - given(tenantProfileConfigurationMock.getMaxRuleNodeDebugModeDurationMinutes(anyInt())).willReturn(15); + given(tenantProfileConfigurationMock.getMaxRuleNodeDebugModeDurationMinutes(anyInt())).willReturn(maxRuleNodeDebugModeDurationMinutes); } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/DebugStrategy.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/DebugStrategy.java index e23b1bea94..a065c9065f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/DebugStrategy.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/DebugStrategy.java @@ -23,12 +23,17 @@ import java.util.concurrent.TimeUnit; @Getter public enum DebugStrategy { - DISABLED(0), ALL_EVENTS(1), ONLY_FAILURE_EVENTS(2); + DISABLED(0, false), + ALL_EVENTS(1, true), + ALL_THEN_ONLY_FAILURE_EVENTS(2, true), + ONLY_FAILURE_EVENTS(3, false); private final int protoNumber; + private final boolean hasDuration; - DebugStrategy(int protoNumber) { + DebugStrategy(int protoNumber, boolean hasDuration) { this.protoNumber = protoNumber; + this.hasDuration = hasDuration; } public boolean shouldPersistDebugInput(long lastUpdateTs, long msgTs, int debugModeDurationMinutes) { @@ -36,26 +41,23 @@ public enum DebugStrategy { } public boolean shouldPersistDebugOutputForAllEvents(long lastUpdateTs, long msgTs, int debugModeDurationMinutes) { - return isAllEventsStrategyAndMsgTsWithinDebugDuration(lastUpdateTs, msgTs, debugModeDurationMinutes); + return this.isAllEventsStrategyAndMsgTsWithinDebugDuration(lastUpdateTs, msgTs, debugModeDurationMinutes); } - public boolean shouldPersistDebugForFailureEventOnly(Set nodeConnections) { - return isFailureOnlyStrategy() && nodeConnections.contains(TbNodeConnectionType.FAILURE); + public boolean shouldPersistDebugForFailureEvent(Set nodeConnections) { + return isFailureStrategy() && nodeConnections.contains(TbNodeConnectionType.FAILURE); } - public boolean shouldPersistDebugForFailureEventOnly(String nodeConnection) { - return isFailureOnlyStrategy() && TbNodeConnectionType.FAILURE.equals(nodeConnection); + public boolean shouldPersistDebugForFailureEvent(String nodeConnection) { + return isFailureStrategy() && TbNodeConnectionType.FAILURE.equals(nodeConnection); } - private boolean isFailureOnlyStrategy() { - return DebugStrategy.ONLY_FAILURE_EVENTS.equals(this); + private boolean isFailureStrategy() { + return DebugStrategy.ONLY_FAILURE_EVENTS.equals(this) || DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS.equals(this); } private boolean isAllEventsStrategyAndMsgTsWithinDebugDuration(long lastUpdateTs, long msgTs, int debugModeDurationMinutes) { - if (!DebugStrategy.ALL_EVENTS.equals(this)) { - return false; - } - return isMsgTsWithinDebugDuration(lastUpdateTs, msgTs, debugModeDurationMinutes); + return this.hasDuration && isMsgTsWithinDebugDuration(lastUpdateTs, msgTs, debugModeDurationMinutes); } private boolean isMsgTsWithinDebugDuration(long lastUpdateTs, long msgCreationTs, int debugModeDurationMinutes) { diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index f5765f3103..167e160648 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -165,7 +165,8 @@ message RuleChainMetadataUpdateMsg { enum DebugStrategy { DISABLED = 0; ALL_EVENTS = 1; - ONLY_FAILURE_EVENTS = 2; + ALL_THEN_ONLY_FAILURE_EVENTS = 2; + ONLY_FAILURE_EVENTS = 3; } message RuleNodeProto {