modify callback

This commit is contained in:
Yuriy Lytvynchuk 2022-09-23 16:39:53 +03:00
parent 01d4295a22
commit 12148c8a60
5 changed files with 161 additions and 8 deletions

View File

@ -17,8 +17,11 @@ package org.thingsboard.server.actors.ruleChain;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.channel.EventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.RuleEngineAlarmService;
@ -49,6 +52,9 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.RuleNode;
@ -84,6 +90,7 @@ import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -360,6 +367,57 @@ class DefaultTbContext implements TbContext {
return entityActionMsg(alarm, alarm.getId(), ruleNodeId, action, queueName, ruleChainId);
}
public TbMsg attributeUpdateActionMsg(EntityId originator, RuleNodeId ruleNodeId, String scope, List<AttributeKvEntry> attributes) {
ObjectNode entityNode = JacksonUtil.newObjectNode();
if (attributes != null) {
attributes.forEach(attributeKvEntry -> addKvEntry(entityNode, attributeKvEntry));
}
return attributeActionMsg(originator, ruleNodeId, scope, DataConstants.ATTRIBUTES_UPDATED, JacksonUtil.toString(entityNode));
}
public TbMsg attributeDeleteActionMsg(EntityId originator, RuleNodeId ruleNodeId, String scope, List<String> keys) {
ObjectNode entityNode = JacksonUtil.newObjectNode();
ArrayNode attrsArrayNode = entityNode.putArray("attributes");
if (keys != null) {
keys.forEach(attrsArrayNode::add);
}
return attributeActionMsg(originator, ruleNodeId, scope, DataConstants.ATTRIBUTES_DELETED, JacksonUtil.toString(entityNode));
}
private TbMsg attributeActionMsg(EntityId originator, RuleNodeId ruleNodeId, String scope, String action, String msgData) {
RuleChainId ruleChainId = null;
String queueName = null;
if (EntityType.DEVICE.equals(originator.getEntityType())) {
DeviceId deviceId = new DeviceId(originator.getId());
DeviceProfile deviceProfile = mainCtx.getDeviceProfileCache().get(getTenantId(), deviceId);
if (deviceProfile == null) {
log.warn("[{}] Device profile is null!", deviceId);
} else {
ruleChainId = deviceProfile.getDefaultRuleChainId();
queueName = deviceProfile.getDefaultQueueName();
}
}
TbMsgMetaData tbMsgMetaData = getActionMetaData(ruleNodeId);
tbMsgMetaData.putValue("scope", scope);
return entityActionMsg(originator, tbMsgMetaData, msgData, action, queueName, ruleChainId);
}
private void addKvEntry(ObjectNode entityNode, KvEntry kvEntry) {
if (kvEntry.getDataType() == DataType.BOOLEAN) {
kvEntry.getBooleanValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
} else if (kvEntry.getDataType() == DataType.DOUBLE) {
kvEntry.getDoubleValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
} else if (kvEntry.getDataType() == DataType.LONG) {
kvEntry.getLongValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
} else if (kvEntry.getDataType() == DataType.JSON) {
if (kvEntry.getJsonValue().isPresent()) {
entityNode.set(kvEntry.getKey(), JacksonUtil.valueToTree(kvEntry.getJsonValue().get()));
}
} else {
entityNode.put(kvEntry.getKey(), kvEntry.getValueAsString());
}
}
@Override
public void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId) {
mainCtx.getClusterService().onEdgeEventUpdate(tenantId, edgeId);
@ -371,12 +429,16 @@ class DefaultTbContext implements TbContext {
public <E, I extends EntityId> TbMsg entityActionMsg(E entity, I id, RuleNodeId ruleNodeId, String action, String queueName, RuleChainId ruleChainId) {
try {
return TbMsg.newMsg(queueName, action, id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity)), ruleChainId, null);
return entityActionMsg(id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity)), action, queueName, ruleChainId);
} catch (JsonProcessingException | IllegalArgumentException e) {
throw new RuntimeException("Failed to process " + id.getEntityType().name().toLowerCase() + " " + action + " msg: " + e);
}
}
public <I extends EntityId> TbMsg entityActionMsg(I id, TbMsgMetaData msgMetaData, String msgData, String action, String queueName, RuleChainId ruleChainId) {
return TbMsg.newMsg(queueName, action, id, msgMetaData, msgData, ruleChainId, null);
}
@Override
public RuleNodeId getSelfId() {
return nodeCtx.getSelf().getId();

View File

@ -29,10 +29,10 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.QueueId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.RuleNode;
@ -59,6 +59,7 @@ import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -180,6 +181,10 @@ public interface TbContext {
// TODO: Does this changes the message?
TbMsg alarmActionMsg(Alarm alarm, RuleNodeId ruleNodeId, String action);
TbMsg attributeUpdateActionMsg(EntityId originator, RuleNodeId ruleNodeId, String scope, List<AttributeKvEntry> attributes);
TbMsg attributeDeleteActionMsg(EntityId originator, RuleNodeId ruleNodeId, String scope, List<String> keys);
void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId);
/*

View File

@ -0,0 +1,45 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.common.msg.TbMsg;
import javax.annotation.Nullable;
import java.util.List;
@Slf4j
public class AttributeDeleteNodeCallback extends TelemetryNodeCallback {
private String scope;
private List<String> keys;
public AttributeDeleteNodeCallback(TbContext ctx, TbMsg msg, String scope, List<String> keys) {
super(ctx, msg);
this.scope = scope;
this.keys = keys;
}
@Override
public void onSuccess(@Nullable Void result) {
TbContext ctx = this.getCtx();
TbMsg tbMsg = this.getMsg();
ctx.enqueue(ctx.attributeDeleteActionMsg(tbMsg.getOriginator(), ctx.getSelfId(), scope, keys),
() -> ctx.tellSuccess(tbMsg),
throwable -> ctx.tellFailure(tbMsg, throwable));
}
}

View File

@ -0,0 +1,44 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.telemetry;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.msg.TbMsg;
import javax.annotation.Nullable;
import java.util.List;
public class AttributeUpdateNodeCallback extends TelemetryNodeCallback {
private String scope;
private List<AttributeKvEntry> attributes;
public AttributeUpdateNodeCallback(TbContext ctx, TbMsg msg, String scope, List<AttributeKvEntry> attributes) {
super(ctx, msg);
this.scope = scope;
this.attributes = attributes;
}
@Override
public void onSuccess(@Nullable Void result) {
TbContext ctx = this.getCtx();
TbMsg tbMsg = this.getMsg();
ctx.enqueue(ctx.attributeUpdateActionMsg(tbMsg.getOriginator(), ctx.getSelfId(), scope, attributes),
() -> ctx.tellSuccess(tbMsg),
throwable -> ctx.tellFailure(tbMsg, throwable));
}
}

View File

@ -45,11 +45,13 @@ import java.util.stream.Collectors;
public class TbMsgDeleteAttributes implements TbNode {
TbMsgDeleteAttributesConfiguration config;
String scope;
List<String> keys;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbMsgDeleteAttributesConfiguration.class);
this.scope = config.getScope();
this.keys = config.getKeys();
}
@ -63,12 +65,7 @@ public class TbMsgDeleteAttributes implements TbNode {
if (keysToDelete.isEmpty()) {
ctx.tellSuccess(msg);
} else {
ctx.getTelemetryService().deleteAndNotify(ctx.getTenantId(), msg.getOriginator(), config.getScope(), keysToDelete, new TelemetryNodeCallback(ctx, msg));
ctx.getTelemetryService().deleteAndNotify(ctx.getTenantId(), msg.getOriginator(), scope, keysToDelete, new AttributeDeleteNodeCallback(ctx, msg, scope, keys));
}
}
@Override
public void destroy() {
}
}