additional fixes in utils classes and tests
This commit is contained in:
parent
27d026821a
commit
daa3c766f0
@ -17,7 +17,6 @@ package org.thingsboard.rule.engine.util;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.Data;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.common.util.DonAsynchron;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
@ -40,7 +39,6 @@ import java.util.function.BiFunction;
|
||||
*/
|
||||
@Data
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class SemaphoreWithTbMsgQueue {
|
||||
|
||||
private final EntityId entityId;
|
||||
@ -85,15 +83,15 @@ public class SemaphoreWithTbMsgQueue {
|
||||
semaphore.release();
|
||||
continue;
|
||||
}
|
||||
final TbMsg msg = tbMsgTbContext.getMsg();
|
||||
final TbMsg msg = tbMsgTbContext.msg();
|
||||
if (!msg.getCallback().isMsgValid()) {
|
||||
log.trace("[{}] Skipping non-valid message [{}]", entityId, msg);
|
||||
semaphore.release();
|
||||
continue;
|
||||
}
|
||||
//DO PROCESSING
|
||||
final TbContext ctx = tbMsgTbContext.getCtx();
|
||||
final ListenableFuture<TbMsg> resultMsgFuture = tbMsgTbContext.getBiFunction().apply(ctx, msg);
|
||||
final TbContext ctx = tbMsgTbContext.ctx();
|
||||
final ListenableFuture<TbMsg> resultMsgFuture = tbMsgTbContext.biFunction().apply(ctx, msg);
|
||||
DonAsynchron.withCallback(resultMsgFuture, resultMsg -> {
|
||||
try {
|
||||
ctx.tellSuccess(resultMsg);
|
||||
@ -115,8 +113,8 @@ public class SemaphoreWithTbMsgQueue {
|
||||
log.error("[{}] Failed to process TbMsgTbContext queue", entityId, t);
|
||||
throw t;
|
||||
}
|
||||
TbMsg msg = tbMsgTbContext.getMsg();
|
||||
TbContext ctx = tbMsgTbContext.getCtx();
|
||||
TbMsg msg = tbMsgTbContext.msg();
|
||||
TbContext ctx = tbMsgTbContext.ctx();
|
||||
log.warn("[{}] Failed to process message: {}", entityId, msg, t);
|
||||
ctx.tellFailure(msg, t); // you are not allowed to throw here, because queue will remain unprocessed
|
||||
continue; // We are probably the last who process the queue. We have to continue poll until get successful callback or queue is empty
|
||||
@ -126,15 +124,11 @@ public class SemaphoreWithTbMsgQueue {
|
||||
}
|
||||
|
||||
/**
|
||||
* A utility class to hold the tuple of a {@link TbMsg}, {@link TbContext}, and the message processing function.
|
||||
* A utility record to hold the tuple of a {@link TbMsg}, {@link TbContext}, and the message processing function.
|
||||
* This facilitates passing these three elements as a single object within the queue.
|
||||
*/
|
||||
@Data
|
||||
@RequiredArgsConstructor
|
||||
private static class TbMsgTbContextBiFunction {
|
||||
private final TbMsg msg;
|
||||
private final TbContext ctx;
|
||||
private final BiFunction<TbContext, TbMsg, ListenableFuture<TbMsg>> biFunction;
|
||||
private record TbMsgTbContextBiFunction(TbMsg msg, TbContext ctx,
|
||||
BiFunction<TbContext, TbMsg, ListenableFuture<TbMsg>> biFunction) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -82,7 +82,7 @@ import static org.mockito.Mockito.when;
|
||||
public class CalculateDeltaNodeTest {
|
||||
|
||||
private final DeviceId DUMMY_DEVICE_ORIGINATOR = new DeviceId(UUID.fromString("2ba3ded4-882b-40cf-999a-89da9ccd58f9"));
|
||||
private final TenantId TENANT_ID = new TenantId(UUID.fromString("3842e740-0d89-43a9-8d52-ae44023847ba"));
|
||||
private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("3842e740-0d89-43a9-8d52-ae44023847ba"));
|
||||
private final ListeningExecutor DB_EXECUTOR = new TestDbCallbackExecutor();
|
||||
|
||||
private static final int RULE_DISPATCHER_POOL_SIZE = 2;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user