refactoring
This commit is contained in:
parent
e8dc2ddc2d
commit
1e2c7f9579
@ -185,7 +185,7 @@ class DefaultTbContext implements TbContext {
|
||||
|
||||
@Override
|
||||
public void enqueue(TbMsg tbMsg, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
||||
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, nodeCtx.getSelf().getQueueName(), getTenantId(), tbMsg.getOriginator());
|
||||
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator());
|
||||
enqueue(tpi, tbMsg, onFailure, onSuccess);
|
||||
}
|
||||
|
||||
@ -310,7 +310,7 @@ class DefaultTbContext implements TbContext {
|
||||
|
||||
@Override
|
||||
public boolean isLocalEntity(EntityId entityId) {
|
||||
return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, nodeCtx.getSelf().getQueueName(), getTenantId(), entityId).isMyPartition();
|
||||
return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), entityId).isMyPartition();
|
||||
}
|
||||
|
||||
private void scheduleMsgWithDelay(TbActorMsg msg, long delayInMs, TbActorRef target) {
|
||||
@ -509,6 +509,11 @@ class DefaultTbContext implements TbContext {
|
||||
return ruleChainName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQueueName() {
|
||||
return getSelf().getQueueName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TenantId getTenantId() {
|
||||
return nodeCtx.getTenantId();
|
||||
|
||||
@ -255,6 +255,8 @@ public interface TbContext {
|
||||
|
||||
String getRuleChainName();
|
||||
|
||||
String getQueueName();
|
||||
|
||||
TenantId getTenantId();
|
||||
|
||||
AttributesService getAttributesService();
|
||||
|
||||
@ -83,7 +83,7 @@ public class TbMsgGeneratorNode implements TbNode {
|
||||
this.config = TbNodeUtils.convert(configuration, TbMsgGeneratorNodeConfiguration.class);
|
||||
this.delay = TimeUnit.SECONDS.toMillis(config.getPeriodInSeconds());
|
||||
this.currentMsgCount = 0;
|
||||
this.queueName = ctx.getSelf().getQueueName();
|
||||
this.queueName = ctx.getQueueName();
|
||||
if (!StringUtils.isEmpty(config.getOriginatorId())) {
|
||||
originatorId = EntityIdFactory.getByTypeAndUuid(config.getOriginatorType(), config.getOriginatorId());
|
||||
ctx.checkTenantEntity(originatorId);
|
||||
|
||||
@ -81,7 +81,7 @@ public class TbMsgDeduplicationNode implements TbNode {
|
||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||
this.config = TbNodeUtils.convert(configuration, TbMsgDeduplicationNodeConfiguration.class);
|
||||
this.deduplicationInterval = TimeUnit.SECONDS.toMillis(config.getInterval());
|
||||
this.queueName = ctx.getSelf().getQueueName();
|
||||
this.queueName = ctx.getQueueName();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -50,7 +50,7 @@ public class TbCheckpointNode implements TbNode {
|
||||
|
||||
@Override
|
||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||
this.queueName = ctx.getSelf().getQueueName();
|
||||
this.queueName = ctx.getQueueName();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -94,7 +94,6 @@ public class TbMsgDeduplicationNodeTest {
|
||||
|
||||
when(ctx.getSelfId()).thenReturn(ruleNodeId);
|
||||
when(ctx.getTenantId()).thenReturn(tenantId);
|
||||
when(ctx.getSelf()).thenReturn(new RuleNode());
|
||||
|
||||
doAnswer((Answer<TbMsg>) invocationOnMock -> {
|
||||
TbMsgType type = (TbMsgType) (invocationOnMock.getArguments())[1];
|
||||
@ -240,10 +239,10 @@ public class TbMsgDeduplicationNodeTest {
|
||||
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
|
||||
invokeTellSelf(wantedNumberOfTellSelfInvocation);
|
||||
|
||||
when(ctx.getQueueName()).thenReturn(DataConstants.HP_QUEUE_NAME);
|
||||
config.setInterval(deduplicationInterval);
|
||||
config.setStrategy(DeduplicationStrategy.ALL);
|
||||
config.setOutMsgType(TbMsgType.POST_ATTRIBUTES_REQUEST.name());
|
||||
setQueueName(DataConstants.HP_QUEUE_NAME);
|
||||
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
|
||||
node.init(ctx, nodeConfiguration);
|
||||
|
||||
@ -280,10 +279,10 @@ public class TbMsgDeduplicationNodeTest {
|
||||
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
|
||||
invokeTellSelf(wantedNumberOfTellSelfInvocation, true, 3);
|
||||
|
||||
when(ctx.getQueueName()).thenReturn(DataConstants.HP_QUEUE_NAME);
|
||||
config.setInterval(deduplicationInterval);
|
||||
config.setStrategy(DeduplicationStrategy.ALL);
|
||||
config.setOutMsgType(TbMsgType.POST_ATTRIBUTES_REQUEST.name());
|
||||
setQueueName(DataConstants.HP_QUEUE_NAME);
|
||||
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
|
||||
node.init(ctx, nodeConfiguration);
|
||||
|
||||
@ -431,10 +430,4 @@ public class TbMsgDeduplicationNodeTest {
|
||||
return JacksonUtil.toString(mergedData);
|
||||
}
|
||||
|
||||
private void setQueueName(String queueName) {
|
||||
RuleNode ruleNode = new RuleNode();
|
||||
ruleNode.setQueueName(queueName);
|
||||
when(ctx.getSelf()).thenReturn(ruleNode);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user