added new debug strategy ALL_THEN_ONLY_FAILURE_EVENTS

This commit is contained in:
YevhenBondarenko 2024-11-05 20:18:15 +01:00
parent 2e28696068
commit 5c6cd2dda8
5 changed files with 212 additions and 27 deletions

View File

@ -1005,19 +1005,19 @@ public class DefaultTbContext implements TbContext {
DebugStrategy debugStrategy = ruleNode.getDebugStrategy();
if (debugStrategy.shouldPersistDebugOutputForAllEvents(ruleNode.getLastUpdateTs(), msg.getTs(), getMaxRuleNodeDebugDurationMinutes())) {
relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(getTenantId(), ruleNode.getId(), msg, relationType, error, failureMessage));
} else if (debugStrategy.shouldPersistDebugForFailureEventOnly(relationTypes)) {
} else if (debugStrategy.shouldPersistDebugForFailureEvent(relationTypes)) {
mainCtx.persistDebugOutput(getTenantId(), ruleNode.getId(), msg, TbNodeConnectionType.FAILURE, error, failureMessage);
}
}
private int getMaxRuleNodeDebugDurationMinutes() {
if (!DebugStrategy.ALL_EVENTS.equals(nodeCtx.getSelf().getDebugStrategy())) {
return 0;
if (nodeCtx.getSelf().getDebugStrategy().isHasDuration()) {
var configuration = mainCtx.getTenantProfileCache()
.get(getTenantId()).getProfileData().getConfiguration();
int systemMaxRuleNodeDebugModeDurationMinutes = mainCtx.getMaxRuleNodeDebugModeDurationMinutes();
return configuration.getMaxRuleNodeDebugModeDurationMinutes(systemMaxRuleNodeDebugModeDurationMinutes);
}
var configuration = mainCtx.getTenantProfileCache()
.get(getTenantId()).getProfileData().getConfiguration();
int systemMaxRuleNodeDebugModeDurationMinutes = mainCtx.getMaxRuleNodeDebugModeDurationMinutes();
return configuration.getMaxRuleNodeDebugModeDurationMinutes(systemMaxRuleNodeDebugModeDurationMinutes);
return 0;
}
}

View File

@ -260,7 +260,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
int maxRuleNodeDebugModeDurationMinutes = getTenantProfileConfiguration()
.getMaxRuleNodeDebugModeDurationMinutes(systemContext.getMaxRuleNodeDebugModeDurationMinutes());
boolean shouldPersistDebugOutput = debugStrategy.shouldPersistDebugOutputForAllEvents(ruleNode.getLastUpdateTs(), tbMsg.getTs(), maxRuleNodeDebugModeDurationMinutes) ||
debugStrategy.shouldPersistDebugForFailureEventOnly(envelope.getRelationType());
debugStrategy.shouldPersistDebugForFailureEvent(envelope.getRelationType());
if (shouldPersistDebugOutput) {
systemContext.persistDebugOutput(tenantId, originatorNodeId, tbMsg, envelope.getRelationType());
}

View File

@ -24,6 +24,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorRef;
@ -74,6 +75,7 @@ import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
@SuppressWarnings("ResultOfMethodCallIgnored")
@ -268,6 +270,70 @@ class DefaultTbContextTest {
checkTellNextCommonLogic(callbackMock, connections, msg);
}
@MethodSource
@ParameterizedTest
void givenDebugStrategyAllThenOnlyFailureEventsAndConnection_whenTellNext_thenVerifyDebugOutputPersisted(String connection) {
// GIVEN
var callbackMock = mock(TbMsgCallback.class);
var msg = getTbMsgWithCallback(callbackMock);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setLastUpdateTs(System.currentTimeMillis());
ruleNode.setDebugStrategy(DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
mockGetMaxRuleNodeDebugModeDurationMinutes();
// WHEN
defaultTbContext.tellNext(msg, connection);
// THEN
then(nodeCtxMock).should().getChainActor();
then(nodeCtxMock).shouldHaveNoMoreInteractions();
then(mainCtxMock).should().getTenantProfileCache();
then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes();
then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msg, connection, null, null);
then(mainCtxMock).shouldHaveNoMoreInteractions();
checkTellNextCommonLogic(callbackMock, connection, msg);
}
private static Stream<String> givenDebugStrategyAllThenOnlyFailureEventsAndConnection_whenTellNext_thenVerifyDebugOutputPersisted() {
return failureAndSuccessConnection();
}
@Test
public void givenDebugStrategyAllThenOnlyEventsAndFailureAndSuccessConnection_whenTellNext_thenVerifyDebugOutputPersistedForAllEvents() {
// GIVEN
var callbackMock = mock(TbMsgCallback.class);
var msg = getTbMsgWithCallback(callbackMock);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setLastUpdateTs(System.currentTimeMillis());
ruleNode.setDebugStrategy(DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
mockGetMaxRuleNodeDebugModeDurationMinutes();
// WHEN
Set<String> connections = failureAndSuccessConnection().collect(Collectors.toSet());
defaultTbContext.tellNext(msg, connections);
// THEN
then(nodeCtxMock).should().getChainActor();
then(nodeCtxMock).shouldHaveNoMoreInteractions();
then(mainCtxMock).should().getTenantProfileCache();
then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes();
var nodeConnectionsCaptor = ArgumentCaptor.forClass(String.class);
int wantedNumberOfInvocations = connections.size();
then(mainCtxMock).should(times(wantedNumberOfInvocations)).persistDebugOutput(eq(TENANT_ID), eq(RULE_NODE_ID), eq(msg), nodeConnectionsCaptor.capture(), nullable(Throwable.class), nullable(String.class));
then(mainCtxMock).shouldHaveNoMoreInteractions();
assertThat(nodeConnectionsCaptor.getAllValues()).hasSize(wantedNumberOfInvocations);
assertThat(nodeConnectionsCaptor.getAllValues()).containsExactlyInAnyOrderElementsOf(connections);
checkTellNextCommonLogic(callbackMock, connections, msg);
}
private static Stream<String> failureAndSuccessConnection() {
return Stream.of(TbNodeConnectionType.FAILURE, TbNodeConnectionType.SUCCESS);
}
@ -361,6 +427,32 @@ class DefaultTbContextTest {
then(nodeCtxMock).shouldHaveNoMoreInteractions();
}
@ParameterizedTest
@ValueSource(strings = {TbNodeConnectionType.SUCCESS, TbNodeConnectionType.FAILURE})
void givenDebugStrategyAllThenOnlyFailureEvents_whenOutput_thenVerifyDebugOutputPersisted(String nodeConnection) {
// GIVEN
var msgMock = mock(TbMsg.class);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.ALL_EVENTS);
given(msgMock.popFormStack()).willReturn(new TbMsgProcessingStackItem(RULE_CHAIN_ID, RULE_NODE_ID));
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
mockGetMaxRuleNodeDebugModeDurationMinutes();
// WHEN
defaultTbContext.output(msgMock, nodeConnection);
// THEN
checkOutputCommonLogic(msgMock, nodeConnection);
then(mainCtxMock).should().getTenantProfileCache();
then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes();
then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msgMock, nodeConnection, null, null);
then(mainCtxMock).shouldHaveNoMoreInteractions();
then(nodeCtxMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenEmptyStack_whenOutput_thenVerifyMsgAck() {
// GIVEN
@ -409,6 +501,32 @@ class DefaultTbContextTest {
then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msgMock, TbNodeConnectionType.ACK, null, null);
}
@Test
public void givenEmptyStackAndDebugStrategyAllThenOnlyFailureEvents_whenOutput_thenVerifyMsgAckAndDebugOutputPersisted() {
// GIVEN
var msgMock = mock(TbMsg.class);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setDebugStrategy(DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS);
ruleNode.setLastUpdateTs(System.currentTimeMillis());
given(msgMock.popFormStack()).willReturn(null);
TbMsgCallback callbackMock = mock(TbMsgCallback.class);
given(msgMock.getCallback()).willReturn(callbackMock);
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
mockGetMaxRuleNodeDebugModeDurationMinutes();
// WHEN
defaultTbContext.output(msgMock, TbNodeConnectionType.SUCCESS);
// THEN
then(msgMock).should().popFormStack();
then(callbackMock).should().onProcessingEnd(RULE_NODE_ID);
then(callbackMock).should().onSuccess();
then(nodeCtxMock).should(never()).getChainActor();
then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msgMock, TbNodeConnectionType.ACK, null, null);
}
@Test
public void givenDebugStrategyOnlyFailureEvents_whenEnqueueForTellFailure_thenVerifyDebugOutputPersisted() {
// GIVEN
@ -551,7 +669,7 @@ class DefaultTbContextTest {
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(mainCtxMock.resolve(any(ServiceType.class), anyString(), any(TenantId.class), any(EntityId.class))).willReturn(tpi);
given(mainCtxMock.getClusterService()).willReturn(tbClusterServiceMock);
if (DebugStrategy.ALL_EVENTS.equals(debugStrategy)) {
if (DebugStrategy.ALL_EVENTS.equals(debugStrategy) || DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS.equals(debugStrategy)) {
mockGetMaxRuleNodeDebugModeDurationMinutes();
}
@ -579,7 +697,7 @@ class DefaultTbContextTest {
assertThat(simpleTbQueueCallback).isNotNull();
simpleTbQueueCallback.onSuccess(null);
if (DebugStrategy.ALL_EVENTS.equals(debugStrategy)) {
if (DebugStrategy.ALL_EVENTS.equals(debugStrategy) || DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS.equals(debugStrategy)) {
then(mainCtxMock).should().getTenantProfileCache();
then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes();
ArgumentCaptor<TbMsg> tbMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
@ -611,7 +729,7 @@ class DefaultTbContextTest {
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(mainCtxMock.resolve(any(ServiceType.class), anyString(), any(TenantId.class), any(EntityId.class))).willReturn(tpi);
given(mainCtxMock.getClusterService()).willReturn(tbClusterServiceMock);
if (DebugStrategy.ALL_EVENTS.equals(debugStrategy)) {
if (DebugStrategy.ALL_EVENTS.equals(debugStrategy) || DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS.equals(debugStrategy)) {
mockGetMaxRuleNodeDebugModeDurationMinutes();
}
@ -642,7 +760,7 @@ class DefaultTbContextTest {
assertThat(simpleTbQueueCallback).isNotNull();
simpleTbQueueCallback.onSuccess(null);
if (DebugStrategy.ALL_EVENTS.equals(debugStrategy)) {
if (debugStrategy.isHasDuration()) {
then(mainCtxMock).should().getTenantProfileCache();
then(mainCtxMock).should().getMaxRuleNodeDebugModeDurationMinutes();
then(mainCtxMock).should().persistDebugOutput(eq(TENANT_ID), eq(RULE_NODE_ID), eq(msg), eq(TbNodeConnectionType.TO_ROOT_RULE_CHAIN), nullable(Throwable.class), nullable(String.class));
@ -744,6 +862,51 @@ class DefaultTbContextTest {
then(nodeCtxMock).shouldHaveNoMoreInteractions();
}
@MethodSource
@ParameterizedTest
void givenDebugStrategyAndConnectionAndPersistedResultOptions_whenTellNext_thenVerifyDebugOutputPersistence(DebugStrategy debugStrategy,
String connection,
boolean shouldPersist,
boolean shouldPersistAfterDurationTime) {
// GIVEN
var callbackMock = mock(TbMsgCallback.class);
var msg = getTbMsgWithCallback(callbackMock);
var ruleNode = new RuleNode(RULE_NODE_ID);
ruleNode.setRuleChainId(RULE_CHAIN_ID);
ruleNode.setLastUpdateTs(System.currentTimeMillis());
ruleNode.setDebugStrategy(debugStrategy);
if (shouldPersist) {
given(nodeCtxMock.getTenantId()).willReturn(TENANT_ID);
}
given(nodeCtxMock.getSelf()).willReturn(ruleNode);
given(nodeCtxMock.getChainActor()).willReturn(chainActorMock);
if (debugStrategy.isHasDuration()) {
mockGetMaxRuleNodeDebugModeDurationMinutes();
}
// WHEN
defaultTbContext.tellNext(msg, connection);
// THEN
if (shouldPersist) {
then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msg, connection, null, null);
}
// GIVEN
Mockito.clearInvocations(mainCtxMock);
if (debugStrategy.isHasDuration()) {
mockGetMaxRuleNodeDebugModeDurationMinutes(0);
}
// WHEN
defaultTbContext.tellNext(msg, connection);
// THEN
if (shouldPersistAfterDurationTime) {
then(mainCtxMock).should().persistDebugOutput(TENANT_ID, RULE_NODE_ID, msg, connection, null, null);
}
}
private void checkTellNextCommonLogic(TbMsgCallback callbackMock, String nodeConnection, TbMsg msg) {
checkTellNextCommonLogic(callbackMock, Collections.singleton(nodeConnection), msg);
}
@ -796,6 +959,7 @@ class DefaultTbContextTest {
private static Stream<Arguments> givenDebugStrategyOptions_whenEnqueueForTellNext_thenVerifyDebugOutputPersistedOnlyForAllEventsDebugStrategy() {
return Stream.of(
Arguments.of(DebugStrategy.ALL_EVENTS, TbNodeConnectionType.OTHER),
Arguments.of(DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS, TbNodeConnectionType.OTHER),
Arguments.of(DebugStrategy.ONLY_FAILURE_EVENTS, TbNodeConnectionType.TRUE),
Arguments.of(DebugStrategy.DISABLED, TbNodeConnectionType.FALSE)
);
@ -804,11 +968,25 @@ class DefaultTbContextTest {
private static Stream<Arguments> givenDebugStrategyOptions_whenEnqueue_thenVerifyDebugOutputPersistedOnlyForAllEventsDebugStrategy() {
return Stream.of(
Arguments.of(DebugStrategy.ALL_EVENTS),
Arguments.of(DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS),
Arguments.of(DebugStrategy.ONLY_FAILURE_EVENTS),
Arguments.of(DebugStrategy.DISABLED)
);
}
private static Stream<Arguments> givenDebugStrategyAndConnectionAndPersistedResultOptions_whenTellNext_thenVerifyDebugOutputPersistence() {
return Stream.of(
Arguments.of(DebugStrategy.ALL_EVENTS, TbNodeConnectionType.SUCCESS, true, false),
Arguments.of(DebugStrategy.ALL_EVENTS, TbNodeConnectionType.FAILURE, true, false),
Arguments.of(DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS, TbNodeConnectionType.SUCCESS, true, false),
Arguments.of(DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS, TbNodeConnectionType.FAILURE, true, true),
Arguments.of(DebugStrategy.ONLY_FAILURE_EVENTS, TbNodeConnectionType.SUCCESS, false, false),
Arguments.of(DebugStrategy.ONLY_FAILURE_EVENTS, TbNodeConnectionType.FAILURE, true, true),
Arguments.of(DebugStrategy.DISABLED, TbNodeConnectionType.SUCCESS, false, false),
Arguments.of(DebugStrategy.DISABLED, TbNodeConnectionType.FAILURE, false, false)
);
}
private TbMsg getTbMsgWithCallback(TbMsgCallback callback) {
return TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, TENANT_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING, callback);
}
@ -822,6 +1000,10 @@ class DefaultTbContextTest {
}
private void mockGetMaxRuleNodeDebugModeDurationMinutes() {
mockGetMaxRuleNodeDebugModeDurationMinutes(15);
}
private void mockGetMaxRuleNodeDebugModeDurationMinutes(int maxRuleNodeDebugModeDurationMinutes) {
var tbTenantProfileCacheMock = mock(TbTenantProfileCache.class);
var tenantProfileMock = mock(TenantProfile.class);
var tenantProfileDataMock = mock(TenantProfileData.class);
@ -831,7 +1013,7 @@ class DefaultTbContextTest {
given(tbTenantProfileCacheMock.get(TENANT_ID)).willReturn(tenantProfileMock);
given(tenantProfileMock.getProfileData()).willReturn(tenantProfileDataMock);
given(tenantProfileDataMock.getConfiguration()).willReturn(tenantProfileConfigurationMock);
given(tenantProfileConfigurationMock.getMaxRuleNodeDebugModeDurationMinutes(anyInt())).willReturn(15);
given(tenantProfileConfigurationMock.getMaxRuleNodeDebugModeDurationMinutes(anyInt())).willReturn(maxRuleNodeDebugModeDurationMinutes);
}
}

View File

@ -23,12 +23,17 @@ import java.util.concurrent.TimeUnit;
@Getter
public enum DebugStrategy {
DISABLED(0), ALL_EVENTS(1), ONLY_FAILURE_EVENTS(2);
DISABLED(0, false),
ALL_EVENTS(1, true),
ALL_THEN_ONLY_FAILURE_EVENTS(2, true),
ONLY_FAILURE_EVENTS(3, false);
private final int protoNumber;
private final boolean hasDuration;
DebugStrategy(int protoNumber) {
DebugStrategy(int protoNumber, boolean hasDuration) {
this.protoNumber = protoNumber;
this.hasDuration = hasDuration;
}
public boolean shouldPersistDebugInput(long lastUpdateTs, long msgTs, int debugModeDurationMinutes) {
@ -36,26 +41,23 @@ public enum DebugStrategy {
}
public boolean shouldPersistDebugOutputForAllEvents(long lastUpdateTs, long msgTs, int debugModeDurationMinutes) {
return isAllEventsStrategyAndMsgTsWithinDebugDuration(lastUpdateTs, msgTs, debugModeDurationMinutes);
return this.isAllEventsStrategyAndMsgTsWithinDebugDuration(lastUpdateTs, msgTs, debugModeDurationMinutes);
}
public boolean shouldPersistDebugForFailureEventOnly(Set<String> nodeConnections) {
return isFailureOnlyStrategy() && nodeConnections.contains(TbNodeConnectionType.FAILURE);
public boolean shouldPersistDebugForFailureEvent(Set<String> nodeConnections) {
return isFailureStrategy() && nodeConnections.contains(TbNodeConnectionType.FAILURE);
}
public boolean shouldPersistDebugForFailureEventOnly(String nodeConnection) {
return isFailureOnlyStrategy() && TbNodeConnectionType.FAILURE.equals(nodeConnection);
public boolean shouldPersistDebugForFailureEvent(String nodeConnection) {
return isFailureStrategy() && TbNodeConnectionType.FAILURE.equals(nodeConnection);
}
private boolean isFailureOnlyStrategy() {
return DebugStrategy.ONLY_FAILURE_EVENTS.equals(this);
private boolean isFailureStrategy() {
return DebugStrategy.ONLY_FAILURE_EVENTS.equals(this) || DebugStrategy.ALL_THEN_ONLY_FAILURE_EVENTS.equals(this);
}
private boolean isAllEventsStrategyAndMsgTsWithinDebugDuration(long lastUpdateTs, long msgTs, int debugModeDurationMinutes) {
if (!DebugStrategy.ALL_EVENTS.equals(this)) {
return false;
}
return isMsgTsWithinDebugDuration(lastUpdateTs, msgTs, debugModeDurationMinutes);
return this.hasDuration && isMsgTsWithinDebugDuration(lastUpdateTs, msgTs, debugModeDurationMinutes);
}
private boolean isMsgTsWithinDebugDuration(long lastUpdateTs, long msgCreationTs, int debugModeDurationMinutes) {

View File

@ -165,7 +165,8 @@ message RuleChainMetadataUpdateMsg {
enum DebugStrategy {
DISABLED = 0;
ALL_EVENTS = 1;
ONLY_FAILURE_EVENTS = 2;
ALL_THEN_ONLY_FAILURE_EVENTS = 2;
ONLY_FAILURE_EVENTS = 3;
}
message RuleNodeProto {