From 030d80d05a27345db4eef03f3ff51dea45c56d4a Mon Sep 17 00:00:00 2001 From: Viacheslav Klimov Date: Wed, 21 Apr 2021 10:00:25 +0300 Subject: [PATCH] CustomerId to TbMsg and SessionInfo --- .../actors/ruleChain/DefaultTbContext.java | 8 ++++- .../server/controller/BaseController.java | 2 +- .../device/DeviceProvisionServiceImpl.java | 4 +-- .../service/queue/TbClusterService.java | 1 - .../transport/DefaultTransportApiService.java | 4 +-- .../thingsboard/server/common/msg/TbMsg.java | 29 ++++++++++++++----- .../server/LwM2mTransportContextServer.java | 2 ++ .../server/LwM2mTransportServiceImpl.java | 2 ++ .../mqtt/session/GatewayDeviceSessionCtx.java | 2 ++ .../rule/engine/api/TbContext.java | 5 ++-- .../TbCopyAttributesToEntityViewNode.java | 2 +- .../rule/engine/action/TbMsgCountNode.java | 10 +++---- .../rule/engine/debug/TbMsgGeneratorNode.java | 8 ++--- .../rule/engine/delay/TbMsgDelayNode.java | 2 +- .../rule/engine/profile/AlarmState.java | 15 +++++----- .../engine/profile/TbDeviceProfileNode.java | 8 ++--- .../rule/engine/rpc/TbSendRPCRequestNode.java | 8 ++--- 17 files changed, 68 insertions(+), 44 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index ae7d8c492e..1ddb99d7b7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -41,6 +41,7 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; @@ -269,7 +270,12 @@ class DefaultTbContext implements TbContext { @Override public TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) { - return TbMsg.newMsg(queueName, type, originator, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); + return newMsg(queueName, type, originator, null, metaData, data); + } + + @Override + public TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) { + return TbMsg.newMsg(queueName, type, originator, customerId, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); } @Override diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index bc9b91cec4..6a1ddcff8b 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -919,7 +919,7 @@ public abstract class BaseController { entityNode.put("endTs", extractParameter(Long.class, 2, additionalInfo)); } } - TbMsg tbMsg = TbMsg.newMsg(msgType, entityId, metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode)); + TbMsg tbMsg = TbMsg.newMsg(msgType, entityId, customerId, metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode)); TenantId tenantId = user.getTenantId(); if (tenantId.isNullUid()) { if (entity instanceof HasTenantId) { diff --git a/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java b/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java index 3fea206c3b..4ba4080a74 100644 --- a/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java +++ b/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java @@ -230,7 +230,7 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService { private void pushProvisionEventToRuleEngine(ProvisionRequest request, Device device, String type) { try { JsonNode entityNode = JacksonUtil.valueToTree(request); - TbMsg msg = TbMsg.newMsg(type, device.getId(), createTbMsgMetaData(device), JacksonUtil.toString(entityNode)); + TbMsg msg = TbMsg.newMsg(type, device.getId(), device.getCustomerId(), createTbMsgMetaData(device), JacksonUtil.toString(entityNode)); sendToRuleEngine(device.getTenantId(), msg, null); } catch (IllegalArgumentException e) { log.warn("[{}] Failed to push device action to rule engine: {}", device.getId(), type, e); @@ -240,7 +240,7 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService { private void pushDeviceCreatedEventToRuleEngine(Device device) { try { ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(device); - TbMsg msg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, device.getId(), createTbMsgMetaData(device), JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode)); + TbMsg msg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, device.getId(), device.getCustomerId(), createTbMsgMetaData(device), JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode)); sendToRuleEngine(device.getTenantId(), msg, null); } catch (JsonProcessingException | IllegalArgumentException e) { log.warn("[{}] Failed to push device action to rule engine: {}", device.getId(), DataConstants.ENTITY_CREATED, e); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java index 6448a77721..c5848bed58 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java @@ -22,7 +22,6 @@ import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; -import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index c7f4db3854..0002fe2dba 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -412,8 +412,8 @@ public class DefaultTransportApiService implements TransportApiService { return DeviceInfoProto.newBuilder() .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits()) - .setCustomerIdMSB(device.getCustomerId().getId().getMostSignificantBits()) - .setCustomerIdLSB(device.getCustomerId().getId().getLeastSignificantBits()) + .setCustomerIdMSB(Optional.ofNullable(device.getCustomerId()).map(customerId -> customerId.getId().getMostSignificantBits()).orElse(0L)) + .setCustomerIdLSB(Optional.ofNullable(device.getCustomerId()).map(customerId -> customerId.getId().getLeastSignificantBits()).orElse(0L)) .setDeviceIdMSB(device.getId().getId().getMostSignificantBits()) .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits()) .setDeviceName(device.getName()) diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index e1db40066d..c843fc6db6 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -19,10 +19,10 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import lombok.AccessLevel; -import lombok.Builder; import lombok.Data; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; @@ -67,8 +67,7 @@ public final class TbMsg implements Serializable { transient private final TbMsgCallback callback; public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { - return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, - metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, 0, TbMsgCallback.EMPTY); + return newMsg(queueName, type, originator, null, metaData, data, ruleChainId, ruleNodeId); } public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { @@ -77,13 +76,21 @@ public final class TbMsg implements Serializable { } public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) { - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY); + return newMsg(type, originator, null, metaData, data); + } + + public static TbMsg newMsg(String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) { + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY); } // REALLY NEW MSG public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) { - return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY); + return newMsg(queueName, type, originator, null, metaData, data); + } + + public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) { + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY); } public static TbMsg newMsg(String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data) { @@ -91,7 +98,7 @@ public final class TbMsg implements Serializable { } public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data) { - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, null, metaData.copy(), dataType, data, null, null, 0, TbMsgCallback.EMPTY); + return newMsg(type, originator, null, metaData, dataType, data); } // For Tests only @@ -145,7 +152,15 @@ public final class TbMsg implements Serializable { } this.type = type; this.originator = originator; - this.customerId = (customerId == null || customerId.isNullUid()) ? null : customerId; + if (customerId == null || customerId.isNullUid()) { + if (originator.getEntityType() == EntityType.CUSTOMER) { + this.customerId = (CustomerId) originator; + } else { + this.customerId = null; + } + } else { + this.customerId = customerId; + } this.metaData = metaData; this.dataType = dataType; this.data = data; diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportContextServer.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportContextServer.java index ed0a1d16e6..d3ccf0e86d 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportContextServer.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportContextServer.java @@ -141,6 +141,8 @@ public class LwM2mTransportContextServer extends TransportContext { .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB()) .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB()) .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB()) + .setCustomerIdMSB(msg.getDeviceInfo().getCustomerIdMSB()) + .setCustomerIdLSB(msg.getDeviceInfo().getCustomerIdLSB()) .setDeviceName(msg.getDeviceInfo().getDeviceName()) .setDeviceType(msg.getDeviceInfo().getDeviceType()) .setDeviceProfileIdLSB(msg.getDeviceInfo().getDeviceProfileIdLSB()) diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServiceImpl.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServiceImpl.java index 05f386ad1a..4a432ca131 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServiceImpl.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServiceImpl.java @@ -1177,6 +1177,8 @@ public class LwM2mTransportServiceImpl implements LwM2mTransportService { .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB()) .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB()) .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB()) + .setCustomerIdMSB(msg.getDeviceInfo().getCustomerIdMSB()) + .setCustomerIdLSB(msg.getDeviceInfo().getCustomerIdLSB()) .setDeviceName(msg.getDeviceInfo().getDeviceName()) .setDeviceType(msg.getDeviceInfo().getDeviceType()) .setDeviceProfileIdLSB(msg.getDeviceInfo().getDeviceProfileIdLSB()) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index a7d5499cfb..e8d9a6b6d4 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -45,6 +45,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple .setDeviceIdLSB(deviceInfo.getDeviceId().getId().getLeastSignificantBits()) .setTenantIdMSB(deviceInfo.getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(deviceInfo.getTenantId().getId().getLeastSignificantBits()) + .setCustomerIdMSB(deviceInfo.getCustomerId().getId().getMostSignificantBits()) + .setCustomerIdLSB(deviceInfo.getCustomerId().getId().getLeastSignificantBits()) .setDeviceName(deviceInfo.getDeviceName()) .setDeviceType(deviceInfo.getDeviceType()) .setGwSessionIdMSB(parent.getSessionId().getMostSignificantBits()) diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index e4a71ee33d..7a0e3cce63 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -16,16 +16,15 @@ package org.thingsboard.rule.engine.api; import io.netty.channel.EventLoopGroup; -import org.springframework.data.redis.core.RedisTemplate; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.api.sms.SmsSenderFactory; -import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; @@ -144,6 +143,8 @@ public interface TbContext { TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data); + TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data); + TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data); TbMsg customerCreatedMsg(Customer customer, RuleNodeId ruleNodeId); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java index 8282c74159..60b25574d4 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java @@ -136,7 +136,7 @@ public class TbCopyAttributesToEntityViewNode implements TbNode { } private void transformAndTellNext(TbContext ctx, TbMsg msg, EntityView entityView) { - ctx.enqueueForTellNext(ctx.newMsg(msg.getQueueName(), msg.getType(), entityView.getId(), msg.getMetaData(), msg.getData()), SUCCESS); + ctx.enqueueForTellNext(ctx.newMsg(msg.getQueueName(), msg.getType(), entityView.getId(), msg.getCustomerId(), msg.getMetaData(), msg.getData()), SUCCESS); } private boolean attributeContainsInEntityView(String scope, String attrKey, EntityView entityView) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbMsgCountNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbMsgCountNode.java index ca98e104c5..e7d018f65f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbMsgCountNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbMsgCountNode.java @@ -63,7 +63,7 @@ public class TbMsgCountNode implements TbNode { TbMsgCountNodeConfiguration config = TbNodeUtils.convert(configuration, TbMsgCountNodeConfiguration.class); this.delay = TimeUnit.SECONDS.toMillis(config.getInterval()); this.telemetryPrefix = config.getTelemetryPrefix(); - scheduleTickMsg(ctx); + scheduleTickMsg(ctx, null); } @@ -78,23 +78,23 @@ public class TbMsgCountNode implements TbNode { TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("delta", Long.toString(System.currentTimeMillis() - lastScheduledTs + delay)); - TbMsg tbMsg = TbMsg.newMsg(msg.getQueueName(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), metaData, gson.toJson(telemetryJson)); + TbMsg tbMsg = TbMsg.newMsg(msg.getQueueName(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), msg.getCustomerId(), metaData, gson.toJson(telemetryJson)); ctx.enqueueForTellNext(tbMsg, SUCCESS); - scheduleTickMsg(ctx); + scheduleTickMsg(ctx, tbMsg); } else { messagesProcessed.incrementAndGet(); ctx.ack(msg); } } - private void scheduleTickMsg(TbContext ctx) { + private void scheduleTickMsg(TbContext ctx, TbMsg msg) { long curTs = System.currentTimeMillis(); if (lastScheduledTs == 0L) { lastScheduledTs = curTs; } lastScheduledTs = lastScheduledTs + delay; long curDelay = Math.max(0L, (lastScheduledTs - curTs)); - TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_COUNT_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), ""); + TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_COUNT_NODE_MSG, ctx.getSelfId(), msg != null ? msg.getCustomerId() : null, new TbMsgMetaData(), ""); nextTickId = tickMsg.getId(); ctx.tellSelf(tickMsg, curDelay); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java index fc6ee959e0..b5a47b4a0d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java @@ -100,7 +100,7 @@ public class TbMsgGeneratorNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { if (initialized && msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) { - withCallback(generate(ctx), + withCallback(generate(ctx, msg), m -> { if (initialized && (config.getMsgCount() == TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT || currentMsgCount < config.getMsgCount())) { ctx.enqueueForTellNext(m, SUCCESS); @@ -130,16 +130,16 @@ public class TbMsgGeneratorNode implements TbNode { ctx.tellSelf(tickMsg, curDelay); } - private ListenableFuture generate(TbContext ctx) { + private ListenableFuture generate(TbContext ctx, TbMsg msg) { return ctx.getJsExecutor().executeAsync(() -> { if (prevMsg == null) { - prevMsg = ctx.newMsg(ServiceQueue.MAIN, "", originatorId, new TbMsgMetaData(), "{}"); + prevMsg = ctx.newMsg(ServiceQueue.MAIN, "", originatorId, msg.getCustomerId(), new TbMsgMetaData(), "{}"); } if (initialized) { ctx.logJsEvalRequest(); TbMsg generated = jsEngine.executeGenerate(prevMsg); ctx.logJsEvalResponse(); - prevMsg = ctx.newMsg(ServiceQueue.MAIN, generated.getType(), originatorId, generated.getMetaData(), generated.getData()); + prevMsg = ctx.newMsg(ServiceQueue.MAIN, generated.getType(), originatorId, msg.getCustomerId(), generated.getMetaData(), generated.getData()); } return prevMsg; }); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java index 730cb734be..4a3113baca 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java @@ -70,7 +70,7 @@ public class TbMsgDelayNode implements TbNode { } else { if (pendingMsgs.size() < config.getMaxPendingMsgs()) { pendingMsgs.put(msg.getId(), msg); - TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_DELAY_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), msg.getId().toString()); + TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_DELAY_NODE_MSG, ctx.getSelfId(), msg.getCustomerId(), new TbMsgMetaData(), msg.getId().toString()); ctx.tellSelf(tickMsg, getDelay(msg)); ctx.ack(msg); } else { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmState.java index ceee362fc8..445b9653a2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmState.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmState.java @@ -74,15 +74,15 @@ class AlarmState { lastMsgMetaData = msg.getMetaData(); lastMsgQueueName = msg.getQueueName(); this.dataSnapshot = data; - return createOrClearAlarms(ctx, data, update, AlarmRuleState::eval); + return createOrClearAlarms(ctx, msg, data, update, AlarmRuleState::eval); } public boolean process(TbContext ctx, long ts) throws ExecutionException, InterruptedException { initCurrentAlarm(ctx); - return createOrClearAlarms(ctx, ts, null, AlarmRuleState::eval); + return createOrClearAlarms(ctx, null, ts, null, AlarmRuleState::eval); } - public boolean createOrClearAlarms(TbContext ctx, T data, SnapshotUpdate update, BiFunction evalFunction) { + public boolean createOrClearAlarms(TbContext ctx, TbMsg msg, T data, SnapshotUpdate update, BiFunction evalFunction) { boolean stateUpdate = false; AlarmRuleState resultState = null; log.debug("[{}] processing update: {}", alarmDefinition.getId(), data); @@ -103,7 +103,7 @@ class AlarmState { if (resultState != null) { TbAlarmResult result = calculateAlarmResult(ctx, resultState); if (result != null) { - pushMsg(ctx, result, resultState); + pushMsg(ctx, msg, result, resultState); } stateUpdate = clearAlarmState(stateUpdate, clearState); } else if (currentAlarm != null && clearState != null) { @@ -122,7 +122,7 @@ class AlarmState { ); DonAsynchron.withCallback(alarmClearOperationResult, result -> { - pushMsg(ctx, new TbAlarmResult(false, false, true, result.getAlarm()), clearState); + pushMsg(ctx, msg, new TbAlarmResult(false, false, true, result.getAlarm()), clearState); }, throwable -> { throw new RuntimeException(throwable); @@ -165,7 +165,7 @@ class AlarmState { } } - public void pushMsg(TbContext ctx, TbAlarmResult alarmResult, AlarmRuleState ruleState) { + public void pushMsg(TbContext ctx, TbMsg msg, TbAlarmResult alarmResult, AlarmRuleState ruleState) { JsonNode jsonNodes = JacksonUtil.valueToTree(alarmResult.getAlarm()); String data = jsonNodes.toString(); TbMsgMetaData metaData = lastMsgMetaData != null ? lastMsgMetaData.copy() : new TbMsgMetaData(); @@ -185,7 +185,8 @@ class AlarmState { metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString()); } setAlarmConditionMetadata(ruleState, metaData); - TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN, "ALARM", originator, metaData, data); + TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN, "ALARM", + originator, msg != null ? msg.getCustomerId() : null, metaData, data); ctx.tellNext(newMsg, relationType); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java index a733fd104a..ebf72d6b3a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java @@ -74,7 +74,7 @@ public class TbDeviceProfileNode implements TbNode { this.config = TbNodeUtils.convert(configuration, TbDeviceProfileNodeConfiguration.class); this.cache = ctx.getDeviceProfileCache(); this.ctx = ctx; - scheduleAlarmHarvesting(ctx); + scheduleAlarmHarvesting(ctx, null); ctx.addDeviceProfileListeners(this::onProfileUpdate, this::onDeviceUpdate); if (config.isFetchAlarmRulesStateOnStart()) { log.info("[{}] Fetching alarm rule state", ctx.getSelfId()); @@ -108,7 +108,7 @@ public class TbDeviceProfileNode implements TbNode { public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { EntityType originatorType = msg.getOriginator().getEntityType(); if (msg.getType().equals(PERIODIC_MSG_TYPE)) { - scheduleAlarmHarvesting(ctx); + scheduleAlarmHarvesting(ctx, msg); harvestAlarms(ctx, System.currentTimeMillis()); } else if (msg.getType().equals(PROFILE_UPDATE_MSG_TYPE)) { updateProfile(ctx, new DeviceProfileId(UUID.fromString(msg.getData()))); @@ -168,8 +168,8 @@ public class TbDeviceProfileNode implements TbNode { return deviceState; } - protected void scheduleAlarmHarvesting(TbContext ctx) { - TbMsg periodicCheck = TbMsg.newMsg(PERIODIC_MSG_TYPE, ctx.getTenantId(), TbMsgMetaData.EMPTY, "{}"); + protected void scheduleAlarmHarvesting(TbContext ctx, TbMsg msg) { + TbMsg periodicCheck = TbMsg.newMsg(PERIODIC_MSG_TYPE, ctx.getTenantId(), msg != null ? msg.getCustomerId() : null, TbMsgMetaData.EMPTY, "{}"); ctx.tellSelf(periodicCheck, TimeUnit.MINUTES.toMillis(1)); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java index 07952359a1..31bd913f47 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java @@ -16,10 +16,6 @@ package org.thingsboard.rule.engine.rpc; import com.datastax.oss.driver.api.core.uuid.Uuids; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; @@ -116,10 +112,10 @@ public class TbSendRPCRequestNode implements TbNode { ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> { if (!ruleEngineDeviceRpcResponse.getError().isPresent()) { - TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}")); + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}")); ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS); } else { - TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name())); } });