diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 92fdfc6e48..9404c50160 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -17,8 +17,11 @@ package org.thingsboard.server.actors.ruleChain; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import io.netty.channel.EventLoopGroup; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.RuleEngineAlarmService; @@ -49,6 +52,9 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.DataType; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.rule.RuleNode; @@ -84,6 +90,7 @@ import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -360,6 +367,57 @@ class DefaultTbContext implements TbContext { return entityActionMsg(alarm, alarm.getId(), ruleNodeId, action, queueName, ruleChainId); } + public TbMsg attributeUpdateActionMsg(EntityId originator, RuleNodeId ruleNodeId, String scope, List attributes) { + ObjectNode entityNode = JacksonUtil.newObjectNode(); + if (attributes != null) { + attributes.forEach(attributeKvEntry -> addKvEntry(entityNode, attributeKvEntry)); + } + return attributeActionMsg(originator, ruleNodeId, scope, DataConstants.ATTRIBUTES_UPDATED, JacksonUtil.toString(entityNode)); + } + + public TbMsg attributeDeleteActionMsg(EntityId originator, RuleNodeId ruleNodeId, String scope, List keys) { + ObjectNode entityNode = JacksonUtil.newObjectNode(); + ArrayNode attrsArrayNode = entityNode.putArray("attributes"); + if (keys != null) { + keys.forEach(attrsArrayNode::add); + } + return attributeActionMsg(originator, ruleNodeId, scope, DataConstants.ATTRIBUTES_DELETED, JacksonUtil.toString(entityNode)); + } + + private TbMsg attributeActionMsg(EntityId originator, RuleNodeId ruleNodeId, String scope, String action, String msgData) { + RuleChainId ruleChainId = null; + String queueName = null; + if (EntityType.DEVICE.equals(originator.getEntityType())) { + DeviceId deviceId = new DeviceId(originator.getId()); + DeviceProfile deviceProfile = mainCtx.getDeviceProfileCache().get(getTenantId(), deviceId); + if (deviceProfile == null) { + log.warn("[{}] Device profile is null!", deviceId); + } else { + ruleChainId = deviceProfile.getDefaultRuleChainId(); + queueName = deviceProfile.getDefaultQueueName(); + } + } + TbMsgMetaData tbMsgMetaData = getActionMetaData(ruleNodeId); + tbMsgMetaData.putValue("scope", scope); + return entityActionMsg(originator, tbMsgMetaData, msgData, action, queueName, ruleChainId); + } + + private void addKvEntry(ObjectNode entityNode, KvEntry kvEntry) { + if (kvEntry.getDataType() == DataType.BOOLEAN) { + kvEntry.getBooleanValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value)); + } else if (kvEntry.getDataType() == DataType.DOUBLE) { + kvEntry.getDoubleValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value)); + } else if (kvEntry.getDataType() == DataType.LONG) { + kvEntry.getLongValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value)); + } else if (kvEntry.getDataType() == DataType.JSON) { + if (kvEntry.getJsonValue().isPresent()) { + entityNode.set(kvEntry.getKey(), JacksonUtil.valueToTree(kvEntry.getJsonValue().get())); + } + } else { + entityNode.put(kvEntry.getKey(), kvEntry.getValueAsString()); + } + } + @Override public void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId) { mainCtx.getClusterService().onEdgeEventUpdate(tenantId, edgeId); @@ -371,12 +429,16 @@ class DefaultTbContext implements TbContext { public TbMsg entityActionMsg(E entity, I id, RuleNodeId ruleNodeId, String action, String queueName, RuleChainId ruleChainId) { try { - return TbMsg.newMsg(queueName, action, id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity)), ruleChainId, null); + return entityActionMsg(id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity)), action, queueName, ruleChainId); } catch (JsonProcessingException | IllegalArgumentException e) { throw new RuntimeException("Failed to process " + id.getEntityType().name().toLowerCase() + " " + action + " msg: " + e); } } + public TbMsg entityActionMsg(I id, TbMsgMetaData msgMetaData, String msgData, String action, String queueName, RuleChainId ruleChainId) { + return TbMsg.newMsg(queueName, action, id, msgMetaData, msgData, ruleChainId, null); + } + @Override public RuleNodeId getSelfId() { return nodeCtx.getSelf().getId(); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 1dac01f788..cbc4191275 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -29,10 +29,10 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.rule.RuleNode; @@ -59,6 +59,7 @@ import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; +import java.util.List; import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -180,6 +181,10 @@ public interface TbContext { // TODO: Does this changes the message? TbMsg alarmActionMsg(Alarm alarm, RuleNodeId ruleNodeId, String action); + TbMsg attributeUpdateActionMsg(EntityId originator, RuleNodeId ruleNodeId, String scope, List attributes); + + TbMsg attributeDeleteActionMsg(EntityId originator, RuleNodeId ruleNodeId, String scope, List keys); + void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId); /* diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/AttributeDeleteNodeCallback.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/AttributeDeleteNodeCallback.java new file mode 100644 index 0000000000..dc0c9c36f4 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/AttributeDeleteNodeCallback.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.server.common.msg.TbMsg; + +import javax.annotation.Nullable; +import java.util.List; + +@Slf4j +public class AttributeDeleteNodeCallback extends TelemetryNodeCallback { + + private String scope; + private List keys; + + public AttributeDeleteNodeCallback(TbContext ctx, TbMsg msg, String scope, List keys) { + super(ctx, msg); + this.scope = scope; + this.keys = keys; + } + + @Override + public void onSuccess(@Nullable Void result) { + TbContext ctx = this.getCtx(); + TbMsg tbMsg = this.getMsg(); + ctx.enqueue(ctx.attributeDeleteActionMsg(tbMsg.getOriginator(), ctx.getSelfId(), scope, keys), + () -> ctx.tellSuccess(tbMsg), + throwable -> ctx.tellFailure(tbMsg, throwable)); + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/AttributeUpdateNodeCallback.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/AttributeUpdateNodeCallback.java new file mode 100644 index 0000000000..fbbe44738c --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/AttributeUpdateNodeCallback.java @@ -0,0 +1,44 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry; + +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.msg.TbMsg; + +import javax.annotation.Nullable; +import java.util.List; + +public class AttributeUpdateNodeCallback extends TelemetryNodeCallback { + + private String scope; + private List attributes; + + public AttributeUpdateNodeCallback(TbContext ctx, TbMsg msg, String scope, List attributes) { + super(ctx, msg); + this.scope = scope; + this.attributes = attributes; + } + + @Override + public void onSuccess(@Nullable Void result) { + TbContext ctx = this.getCtx(); + TbMsg tbMsg = this.getMsg(); + ctx.enqueue(ctx.attributeUpdateActionMsg(tbMsg.getOriginator(), ctx.getSelfId(), scope, attributes), + () -> ctx.tellSuccess(tbMsg), + throwable -> ctx.tellFailure(tbMsg, throwable)); + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributes.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributes.java index be82be5ecc..32c6ce79fa 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributes.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributes.java @@ -45,11 +45,13 @@ import java.util.stream.Collectors; public class TbMsgDeleteAttributes implements TbNode { TbMsgDeleteAttributesConfiguration config; + String scope; List keys; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbMsgDeleteAttributesConfiguration.class); + this.scope = config.getScope(); this.keys = config.getKeys(); } @@ -63,12 +65,7 @@ public class TbMsgDeleteAttributes implements TbNode { if (keysToDelete.isEmpty()) { ctx.tellSuccess(msg); } else { - ctx.getTelemetryService().deleteAndNotify(ctx.getTenantId(), msg.getOriginator(), config.getScope(), keysToDelete, new TelemetryNodeCallback(ctx, msg)); + ctx.getTelemetryService().deleteAndNotify(ctx.getTenantId(), msg.getOriginator(), scope, keysToDelete, new AttributeDeleteNodeCallback(ctx, msg, scope, keys)); } } - - @Override - public void destroy() { - - } }