Refactor TbMsg copying

This commit is contained in:
ViacheslavKlimov 2024-12-13 15:18:37 +02:00
parent 0b3ffb5b4a
commit 1b6256a9e8
3 changed files with 42 additions and 44 deletions

View File

@ -173,7 +173,10 @@ public class DefaultTbContext implements TbContext {
if (!msg.isValid()) {
return;
}
TbMsg tbMsg = msg.copyWithRuleChainId(ruleChainId);
TbMsg tbMsg = msg.copy()
.ruleChainId(ruleChainId)
.ruleNodeId(null)
.build();
tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator());
doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t)));

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.actors.ruleChain;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.DebugModeUtil;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorRef;
@ -35,7 +36,6 @@ import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.common.util.DebugModeUtil;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.plugin.RuleNodeUpdatedMsg;
@ -217,7 +217,10 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
RuleNodeCtx targetCtx;
if (targetId == null) {
targetCtx = firstNode;
msg = msg.copyWithRuleChainId(entityId);
msg = msg.copy()
.ruleChainId(entityId)
.ruleNodeId(null)
.build();
} else {
targetCtx = nodeActors.get(targetId);
}
@ -343,10 +346,18 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private void putToQueue(TopicPartitionInfo tpi, TbMsg msg, TbQueueCallback callbackWrapper, EntityId target) {
switch (target.getEntityType()) {
case RULE_NODE:
putToQueue(tpi, msg.copyWithRuleNodeId(entityId, new RuleNodeId(target.getId()), UUID.randomUUID()), callbackWrapper);
putToQueue(tpi, msg.copy()
.id(UUID.randomUUID())
.ruleChainId(entityId)
.ruleNodeId(new RuleNodeId(target.getId()))
.build(), callbackWrapper);
break;
case RULE_CHAIN:
putToQueue(tpi, msg.copyWithRuleChainId(new RuleChainId(target.getId()), UUID.randomUUID()), callbackWrapper);
putToQueue(tpi, msg.copy()
.id(UUID.randomUUID())
.ruleChainId(new RuleChainId(target.getId()))
.ruleNodeId(null)
.build(), callbackWrapper);
break;
}
}

View File

@ -19,7 +19,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@ -38,14 +37,11 @@ import java.io.Serializable;
import java.util.Objects;
import java.util.UUID;
import static java.util.Objects.requireNonNull;
/**
* Created by ashvayka on 13.01.18.
*/
@Data
@Slf4j
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public final class TbMsg implements Serializable {
public static final String EMPTY_JSON_OBJECT = "{}";
@ -77,8 +73,16 @@ public final class TbMsg implements Serializable {
@JsonIgnore
transient private final TbMsgCallback callback;
public int getAndIncrementRuleNodeCounter() {
return ctx.getAndIncrementRuleNodeCounter();
public static TbMsgBuilder newMsg() {
return new TbMsgBuilder();
}
public TbMsgBuilder transform() {
return new TbMsgTransformer(this);
}
public TbMsgBuilder copy() {
return new TbMsgBuilder(this);
}
public TbMsg transform(String queueName) {
@ -100,23 +104,11 @@ public final class TbMsg implements Serializable {
.build();
}
public TbMsg copyWithRuleChainId(RuleChainId ruleChainId) {
return copyWithRuleChainId(ruleChainId, this.id);
}
public TbMsg copyWithRuleChainId(RuleChainId ruleChainId, UUID msgId) {
return new TbMsg(this.queueName, msgId, this.ts, this.internalType, this.type, this.originator, this.customerId,
this.metaData, this.dataType, this.data, ruleChainId, null, this.correlationId, this.partition, this.ctx, callback);
}
public TbMsg copyWithRuleNodeId(RuleChainId ruleChainId, RuleNodeId ruleNodeId, UUID msgId) {
return new TbMsg(this.queueName, msgId, this.ts, this.internalType, this.type, this.originator, this.customerId,
this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.correlationId, this.partition, this.ctx, callback);
}
public TbMsg copyWithNewCtx() {
return new TbMsg(this.queueName, this.id, this.ts, this.internalType, this.type, this.originator, this.customerId,
this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.correlationId, this.partition, this.ctx.copy(), TbMsgCallback.EMPTY);
return copy()
.ctx(ctx.copy())
.callback(TbMsgCallback.EMPTY)
.build();
}
private TbMsg(String queueName, UUID id, long ts, TbMsgType internalType, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
@ -142,7 +134,7 @@ public final class TbMsg implements Serializable {
}
this.metaData = metaData;
this.dataType = dataType != null ? dataType : TbMsgDataType.JSON;
this.data = requireNonNull(data, "msg data is missing");
this.data = data;
this.ruleChainId = ruleChainId;
this.ruleNodeId = ruleNodeId;
this.correlationId = correlationId;
@ -238,6 +230,10 @@ public final class TbMsg implements Serializable {
}
}
public int getAndIncrementRuleNodeCounter() {
return ctx.getAndIncrementRuleNodeCounter();
}
public TbMsgCallback getCallback() {
// May be null in case of deserialization;
return Objects.requireNonNullElse(callback, TbMsgCallback.EMPTY);
@ -293,19 +289,7 @@ public final class TbMsg implements Serializable {
return false;
}
public static TbMsgBuilder newMsg() {
return new TbMsgBuilder();
}
public TbMsgBuilder transform() {
return new TbMsgTransformer(this);
}
public TbMsgBuilder copy() {
return new TbMsgBuilder(this);
}
private static class TbMsgTransformer extends TbMsgBuilder {
public static class TbMsgTransformer extends TbMsgBuilder {
TbMsgTransformer(TbMsg tbMsg) {
super(tbMsg);
@ -335,8 +319,8 @@ public final class TbMsg implements Serializable {
/*
* always copying ctx when transforming
* */
if (ctx != null) {
ctx = ctx.copy();
if (this.ctx != null) {
this.ctx = this.ctx.copy();
}
return super.build();
}
@ -472,7 +456,7 @@ public final class TbMsg implements Serializable {
}
public TbMsg build() {
return new TbMsg(queueName, id, ts, type, internalType, originator, customerId, metaData, dataType, data, ruleChainId, ruleNodeId, correlationId, partition, ctx, callback);
return new TbMsg(queueName, id, ts, internalType, type, originator, customerId, metaData, dataType, data, ruleChainId, ruleNodeId, correlationId, partition, ctx, callback);
}
public String toString() {