Imrpvements to the entity message routing based on the device profile

This commit is contained in:
Andrii Shvaika 2021-02-22 17:18:48 +02:00
parent 16f3146fd4
commit a4508aa193
3 changed files with 19 additions and 3 deletions

View File

@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.asset.Asset;
@ -300,7 +301,22 @@ class DefaultTbContext implements TbContext {
}
public TbMsg alarmActionMsg(Alarm alarm, RuleNodeId ruleNodeId, String action) {
return entityActionMsg(alarm, alarm.getId(), ruleNodeId, action);
RuleChainId ruleChainId = null;
String queueName = ServiceQueue.MAIN;
if (EntityType.DEVICE.equals(alarm.getOriginator().getEntityType())) {
DeviceId deviceId = new DeviceId(alarm.getOriginator().getId());
DeviceProfile deviceProfile = mainCtx.getDeviceProfileCache().get(getTenantId(), deviceId);
if (deviceProfile == null) {
log.warn("[{}] Device profile is null!", deviceId);
ruleChainId = null;
queueName = ServiceQueue.MAIN;
} else {
ruleChainId = deviceProfile.getDefaultRuleChainId();
String defaultQueueName = deviceProfile.getDefaultQueueName();
queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN;
}
}
return entityActionMsg(alarm, alarm.getId(), ruleNodeId, action, queueName, ruleChainId);
}
public <E, I extends EntityId> TbMsg entityActionMsg(E entity, I id, RuleNodeId ruleNodeId, String action) {

View File

@ -145,7 +145,7 @@ public class DefaultTbClusterService implements TbClusterService {
tbMsg = transformMsg(tbMsg, deviceProfileCache.get(tenantId, new DeviceProfileId(entityId.getId())));
}
}
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, entityId);
log.trace("PUSHING msg: {} to:{}", tbMsg, tpi);
ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())

View File

@ -120,7 +120,7 @@ public final class TbMsg implements Serializable {
private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
RuleChainId ruleChainId, RuleNodeId ruleNodeId, int ruleNodeExecCounter, TbMsgCallback callback) {
this.id = id;
this.queueName = queueName;
this.queueName = queueName != null ? queueName : ServiceQueue.MAIN;
if (ts > 0) {
this.ts = ts;
} else {