Merge pull request #8983 from smatvienko-tb/feature/onRateLimit-TbMsgPackCallback-skipAllRateLimited

Rule engine: ack all rate limited failures
This commit is contained in:
Andrew Shvayka 2023-07-31 17:17:36 +03:00 committed by GitHub
commit a8d0968cc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 317 additions and 9 deletions

View File

@ -232,7 +232,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
} catch (RuleNodeException rne) { } catch (RuleNodeException rne) {
msg.getCallback().onFailure(rne); msg.getCallback().onFailure(rne);
} catch (Exception e) { } catch (Exception e) {
msg.getCallback().onFailure(new RuleEngineException(e.getMessage())); msg.getCallback().onFailure(new RuleEngineException(e.getMessage(), e));
} }
} }
@ -335,7 +335,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
msg.getCallback().onFailure(rne); msg.getCallback().onFailure(rne);
} catch (Exception e) { } catch (Exception e) {
log.warn("[" + tenantId + "]" + "[" + entityId + "]" + "[" + msg.getId() + "]" + " onTellNext failure", e); log.warn("[" + tenantId + "]" + "[" + entityId + "]" + "[" + msg.getId() + "]" + " onTellNext failure", e);
msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage())); msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage(), e));
} }
} }

View File

@ -349,7 +349,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
callback.onSuccess(); callback.onSuccess();
} }
} catch (Exception e) { } catch (Exception e) {
callback.onFailure(new RuleEngineException(e.getMessage())); callback.onFailure(new RuleEngineException(e.getMessage(), e));
} }
} }

View File

@ -17,11 +17,14 @@ package org.thingsboard.server.service.queue;
import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ExceptionUtil;
import org.thingsboard.server.common.data.exception.AbstractRateLimitException;
import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.RuleNodeInfo; import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
import org.thingsboard.server.common.msg.queue.TbMsgCallback; import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -57,8 +60,23 @@ public class TbMsgPackCallback implements TbMsgCallback {
ctx.onSuccess(id); ctx.onSuccess(id);
} }
@Override
public void onRateLimit(RuleEngineException e) {
log.debug("[{}] ON RATE LIMIT", id, e);
//TODO notify tenant on rate limit
if (failedMsgTimer != null) {
failedMsgTimer.record(System.currentTimeMillis() - startMsgProcessing, TimeUnit.MILLISECONDS);
}
ctx.onSuccess(id);
}
@Override @Override
public void onFailure(RuleEngineException e) { public void onFailure(RuleEngineException e) {
if (ExceptionUtil.lookupExceptionInCause(e, AbstractRateLimitException.class) != null) {
onRateLimit(e);
return;
}
log.trace("[{}] ON FAILURE", id, e); log.trace("[{}] ON FAILURE", id, e);
if (failedMsgTimer != null) { if (failedMsgTimer != null) {
failedMsgTimer.record(System.currentTimeMillis() - startMsgProcessing, TimeUnit.MILLISECONDS); failedMsgTimer.record(System.currentTimeMillis() - startMsgProcessing, TimeUnit.MILLISECONDS);

View File

@ -0,0 +1,98 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.queue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.RuleNodeException;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import java.util.UUID;
import java.util.stream.Stream;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
class TbMsgPackCallbackTest {
TenantId tenantId;
UUID msgId;
TbMsgPackProcessingContext ctx;
TbMsgPackCallback callback;
@BeforeEach
void setUp() {
tenantId = TenantId.fromUUID(UUID.randomUUID());
msgId = UUID.randomUUID();
ctx = mock(TbMsgPackProcessingContext.class);
callback = spy(new TbMsgPackCallback(msgId, tenantId, ctx));
}
private static Stream<Arguments> testOnFailure_NotRateLimitException() {
return Stream.of(
Arguments.of(new RuleEngineException("rule engine no cause")),
Arguments.of(new RuleEngineException("rule engine caused 1 lvl", new RuntimeException())),
Arguments.of(new RuleEngineException("rule engine caused 2 lvl", new RuntimeException(new Exception()))),
Arguments.of(new RuleEngineException("rule engine caused 2 lvl Throwable", new RuntimeException(new Throwable()))),
Arguments.of(new RuleNodeException("rule node no cause", "RuleChain", new RuleNode()))
);
}
@ParameterizedTest
@MethodSource
void testOnFailure_NotRateLimitException(RuleEngineException ree) {
callback.onFailure(ree);
verify(callback, never()).onRateLimit(any());
verify(callback, never()).onSuccess();
verify(ctx, never()).onSuccess(any());
}
private static Stream<Arguments> testOnFailure_RateLimitException() {
return Stream.of(
Arguments.of(new RuleEngineException("caused lvl 1", new TbRateLimitsException(EntityType.ASSET))),
Arguments.of(new RuleEngineException("caused lvl 2", new RuntimeException(new TbRateLimitsException(EntityType.ASSET)))),
Arguments.of(
new RuleEngineException("caused lvl 3",
new RuntimeException(
new Exception(
new TbRateLimitsException(EntityType.ASSET)))))
);
}
@ParameterizedTest
@MethodSource
void testOnFailure_RateLimitException(RuleEngineException ree) {
callback.onFailure(ree);
verify(callback).onRateLimit(any());
verify(callback).onFailure(any());
verify(callback, never()).onSuccess();
verify(ctx).onSuccess(msgId);
verify(ctx).onSuccess(any());
verify(ctx, never()).onFailure(any(), any(), any());
}
}

View File

@ -0,0 +1,41 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.exception;
public abstract class AbstractRateLimitException extends RuntimeException {
public AbstractRateLimitException() {
super();
}
public AbstractRateLimitException(String message) {
super(message);
}
public AbstractRateLimitException(String message, Throwable cause) {
super(message, cause);
}
public AbstractRateLimitException(Throwable cause) {
super(cause);
}
protected AbstractRateLimitException(String message, Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -15,7 +15,7 @@
*/ */
package org.thingsboard.server.common.data.exception; package org.thingsboard.server.common.data.exception;
public class ApiUsageLimitsExceededException extends RuntimeException { public class ApiUsageLimitsExceededException extends AbstractRateLimitException {
public ApiUsageLimitsExceededException(String message) { public ApiUsageLimitsExceededException(String message) {
super(message); super(message);
} }

View File

@ -32,6 +32,11 @@ public class RuleEngineException extends Exception {
ts = System.currentTimeMillis(); ts = System.currentTimeMillis();
} }
public RuleEngineException(String message, Throwable t) {
super(message != null ? message : "Unknown", t);
ts = System.currentTimeMillis();
}
public String toJsonString() { public String toJsonString() {
try { try {
return mapper.writeValueAsString(mapper.createObjectNode().put("message", getMessage())); return mapper.writeValueAsString(mapper.createObjectNode().put("message", getMessage()));

View File

@ -39,6 +39,10 @@ public interface TbMsgCallback {
void onFailure(RuleEngineException e); void onFailure(RuleEngineException e);
default void onRateLimit(RuleEngineException e) {
onFailure(e);
};
/** /**
* Returns 'true' if rule engine is expecting the message to be processed, 'false' otherwise. * Returns 'true' if rule engine is expecting the message to be processed, 'false' otherwise.
* message may no longer be valid, if the message pack is already expired/canceled/failed. * message may no longer be valid, if the message pack is already expired/canceled/failed.

View File

@ -17,11 +17,12 @@ package org.thingsboard.server.common.msg.tools;
import lombok.Getter; import lombok.Getter;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.exception.AbstractRateLimitException;
/** /**
* Created by ashvayka on 22.10.18. * Created by ashvayka on 22.10.18.
*/ */
public class TbRateLimitsException extends RuntimeException { public class TbRateLimitsException extends AbstractRateLimitException {
@Getter @Getter
private final EntityType entityType; private final EntityType entityType;

View File

@ -40,6 +40,6 @@ public class MultipleTbQueueCallbackWrapper implements TbQueueCallback {
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
callback.onFailure(new RuleEngineException(t.getMessage())); callback.onFailure(new RuleEngineException(t.getMessage(), t));
} }
} }

View File

@ -41,6 +41,6 @@ public class MultipleTbQueueTbMsgCallbackWrapper implements TbQueueCallback {
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
tbMsgCallback.onFailure(new RuleEngineException(t.getMessage())); tbMsgCallback.onFailure(new RuleEngineException(t.getMessage(), t));
} }
} }

View File

@ -35,6 +35,6 @@ public class TbQueueTbMsgCallbackWrapper implements TbQueueCallback {
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
tbMsgCallback.onFailure(new RuleEngineException(t.getMessage())); tbMsgCallback.onFailure(new RuleEngineException(t.getMessage(), t));
} }
} }

View File

@ -0,0 +1,67 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import com.google.gson.JsonParseException;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.EntityId;
import javax.script.ScriptException;
import java.io.PrintWriter;
import java.io.StringWriter;
@Slf4j
public class ExceptionUtil {
@SuppressWarnings("unchecked")
public static <T extends Exception> T lookupException(Throwable source, Class<T> clazz) {
Exception e = lookupExceptionInCause(source, clazz);
if (e != null) {
return (T) e;
} else {
return null;
}
}
public static Exception lookupExceptionInCause(Throwable source, Class<? extends Exception>... clazzes) {
while (source != null) {
for (Class<? extends Exception> clazz : clazzes) {
if (clazz.isAssignableFrom(source.getClass())) {
return (Exception) source;
}
}
source = source.getCause();
}
return null;
}
public static String toString(Exception e, EntityId componentId, boolean stackTraceEnabled) {
Exception exception = lookupExceptionInCause(e, ScriptException.class, JsonParseException.class);
if (exception != null && StringUtils.isNotEmpty(exception.getMessage())) {
return exception.getMessage();
} else {
if (stackTraceEnabled) {
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
return sw.toString();
} else {
log.debug("[{}] Unknown error during message processing", componentId, e);
return "Please contact system administrator";
}
}
}
}

View File

@ -0,0 +1,74 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import static org.assertj.core.api.Assertions.assertThat;
class ExceptionUtilTest {
final Exception cause = new RuntimeException();
@Test
void givenRootCause_whenLookupExceptionInCause_thenReturnRootCauseAndNoStackOverflow() {
Exception e = cause;
for (int i = 0; i <= 16384; i++) {
e = new Exception(e);
}
assertThat(ExceptionUtil.lookupExceptionInCause(e, RuntimeException.class)).isSameAs(cause);
}
@Test
void givenCause_whenLookupExceptionInCause_thenReturnCause() {
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(cause), RuntimeException.class)).isSameAs(cause);
}
@Test
void givenNoCauseAndExceptionIsWantedCauseClass_whenLookupExceptionInCause_thenReturnSelf() {
assertThat(ExceptionUtil.lookupExceptionInCause(cause, RuntimeException.class)).isSameAs(cause);
}
@Test
void givenNoCause_whenLookupExceptionInCause_thenReturnNull() {
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(), RuntimeException.class)).isNull();
}
@Test
void givenNotWantedCause_whenLookupExceptionInCause_thenReturnNull() {
final Exception cause = new IOException();
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(cause), RuntimeException.class)).isNull();
}
@Test
void givenCause_whenLookupExceptionInCauseByMany_thenReturnFirstCause() {
final Exception causeIAE = new IllegalAccessException();
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE))).isNull();
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IOException.class, NoSuchFieldException.class)).isNull();
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IllegalAccessException.class, IOException.class, NoSuchFieldException.class)).isSameAs(causeIAE);
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), IOException.class, NoSuchFieldException.class, IllegalAccessException.class)).isSameAs(causeIAE);
final Exception causeIOE = new IOException(causeIAE);
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE))).isNull();
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIAE), ClassNotFoundException.class, NoSuchFieldException.class)).isNull();
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IOException.class, NoSuchFieldException.class)).isSameAs(causeIOE);
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IllegalAccessException.class, IOException.class, NoSuchFieldException.class)).isSameAs(causeIOE);
assertThat(ExceptionUtil.lookupExceptionInCause(new Exception(causeIOE), IOException.class, NoSuchFieldException.class, IllegalAccessException.class)).isSameAs(causeIOE);
}
}

View File

@ -39,7 +39,7 @@ public class MultipleTbMsgsCallbackWrapper implements TbMsgCallbackWrapper {
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
callback.onFailure(new RuleEngineException(t.getMessage())); callback.onFailure(new RuleEngineException(t.getMessage(), t));
} }
} }