diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 33152b0964..e1717e3b9b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.asset.Asset; @@ -300,7 +301,22 @@ class DefaultTbContext implements TbContext { } public TbMsg alarmActionMsg(Alarm alarm, RuleNodeId ruleNodeId, String action) { - return entityActionMsg(alarm, alarm.getId(), ruleNodeId, action); + RuleChainId ruleChainId = null; + String queueName = ServiceQueue.MAIN; + if (EntityType.DEVICE.equals(alarm.getOriginator().getEntityType())) { + DeviceId deviceId = new DeviceId(alarm.getOriginator().getId()); + DeviceProfile deviceProfile = mainCtx.getDeviceProfileCache().get(getTenantId(), deviceId); + if (deviceProfile == null) { + log.warn("[{}] Device profile is null!", deviceId); + ruleChainId = null; + queueName = ServiceQueue.MAIN; + } else { + ruleChainId = deviceProfile.getDefaultRuleChainId(); + String defaultQueueName = deviceProfile.getDefaultQueueName(); + queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN; + } + } + return entityActionMsg(alarm, alarm.getId(), ruleNodeId, action, queueName, ruleChainId); } public TbMsg entityActionMsg(E entity, I id, RuleNodeId ruleNodeId, String action) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 51fd388af9..133447022b 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -145,7 +145,7 @@ public class DefaultTbClusterService implements TbClusterService { tbMsg = transformMsg(tbMsg, deviceProfileCache.get(tenantId, new DeviceProfileId(entityId.getId()))); } } - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, entityId); log.trace("PUSHING msg: {} to:{}", tbMsg, tpi); ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 348e8021e4..66a3670ddd 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -120,7 +120,7 @@ public final class TbMsg implements Serializable { private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId, int ruleNodeExecCounter, TbMsgCallback callback) { this.id = id; - this.queueName = queueName; + this.queueName = queueName != null ? queueName : ServiceQueue.MAIN; if (ts > 0) { this.ts = ts; } else {