Ack messages in case no edges are related. Refactored push to cloud/edge nodes
This commit is contained in:
		
							parent
							
								
									fe16336bbb
								
							
						
					
					
						commit
						ef4adbda7b
					
				@ -0,0 +1,177 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2022 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.edge;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.core.type.TypeReference;
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbContext;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.DataConstants;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
 | 
			
		||||
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfiguration, S, U> implements TbNode {
 | 
			
		||||
 | 
			
		||||
    protected T config;
 | 
			
		||||
 | 
			
		||||
    private static final String SCOPE = "scope";
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, getConfigClazz());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        if (getIgnoredMessageSource().equalsIgnoreCase(msg.getMetaData().getValue(DataConstants.MSG_SOURCE_KEY))) {
 | 
			
		||||
            log.debug("Ignoring msg from the {}, msg [{}]", getIgnoredMessageSource(), msg);
 | 
			
		||||
            ctx.ack(msg);
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        if (isSupportedOriginator(msg.getOriginator().getEntityType())) {
 | 
			
		||||
            if (isSupportedMsgType(msg.getType())) {
 | 
			
		||||
                processMsg(ctx, msg);
 | 
			
		||||
            } else {
 | 
			
		||||
                String errMsg = String.format("Unsupported msg type %s", msg.getType());
 | 
			
		||||
                log.debug(errMsg);
 | 
			
		||||
                ctx.tellFailure(msg, new RuntimeException(errMsg));
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            String errMsg = String.format("Unsupported originator type %s", msg.getOriginator().getEntityType());
 | 
			
		||||
            log.debug(errMsg);
 | 
			
		||||
            ctx.tellFailure(msg, new RuntimeException(errMsg));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void destroy() {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected S buildEvent(TbMsg msg, TbContext ctx) {
 | 
			
		||||
        String msgType = msg.getType();
 | 
			
		||||
        if (DataConstants.ALARM.equals(msgType)) {
 | 
			
		||||
            return buildEvent(ctx.getTenantId(), EdgeEventActionType.ADDED, getUUIDFromMsgData(msg), getAlarmEventType(), null);
 | 
			
		||||
        } else {
 | 
			
		||||
            U eventTypeByEntityType = getEventTypeByEntityType(msg.getOriginator().getEntityType());
 | 
			
		||||
            if (eventTypeByEntityType == null) {
 | 
			
		||||
                return null;
 | 
			
		||||
            }
 | 
			
		||||
            EdgeEventActionType actionType = getEdgeEventActionTypeByMsgType(msgType);
 | 
			
		||||
            Map<String, Object> entityBody = new HashMap<>();
 | 
			
		||||
            Map<String, String> metadata = msg.getMetaData().getData();
 | 
			
		||||
            JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData());
 | 
			
		||||
            switch (actionType) {
 | 
			
		||||
                case ATTRIBUTES_UPDATED:
 | 
			
		||||
                case POST_ATTRIBUTES:
 | 
			
		||||
                    entityBody.put("kv", dataJson);
 | 
			
		||||
                    entityBody.put(SCOPE, getScope(metadata));
 | 
			
		||||
                    if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)) {
 | 
			
		||||
                        entityBody.put("isPostAttributes", true);
 | 
			
		||||
                    }
 | 
			
		||||
                    break;
 | 
			
		||||
                case ATTRIBUTES_DELETED:
 | 
			
		||||
                    List<String> keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() {});
 | 
			
		||||
                    entityBody.put("keys", keys);
 | 
			
		||||
                    entityBody.put(SCOPE, getScope(metadata));
 | 
			
		||||
                    break;
 | 
			
		||||
                case TIMESERIES_UPDATED:
 | 
			
		||||
                    entityBody.put("data", dataJson);
 | 
			
		||||
                    entityBody.put("ts", metadata.get("ts"));
 | 
			
		||||
                    break;
 | 
			
		||||
            }
 | 
			
		||||
            return buildEvent(ctx.getTenantId(), actionType, msg.getOriginator().getId(), eventTypeByEntityType, JacksonUtil.valueToTree(entityBody));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    abstract S buildEvent(TenantId tenantId, EdgeEventActionType edgeEventAction, UUID entityId, U edgeEventType, JsonNode entityBody);
 | 
			
		||||
 | 
			
		||||
    abstract U getEventTypeByEntityType(EntityType entityType);
 | 
			
		||||
 | 
			
		||||
    abstract U getAlarmEventType();
 | 
			
		||||
 | 
			
		||||
    abstract String getIgnoredMessageSource();
 | 
			
		||||
 | 
			
		||||
    abstract protected Class<T> getConfigClazz();
 | 
			
		||||
 | 
			
		||||
    abstract void processMsg(TbContext ctx, TbMsg msg);
 | 
			
		||||
 | 
			
		||||
    protected UUID getUUIDFromMsgData(TbMsg msg) {
 | 
			
		||||
        JsonNode data = JacksonUtil.toJsonNode(msg.getData()).get("id");
 | 
			
		||||
        String id = JacksonUtil.convertValue(data.get("id"), String.class);
 | 
			
		||||
        return UUID.fromString(id);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected String getScope(Map<String, String> metadata) {
 | 
			
		||||
        String scope = metadata.get(SCOPE);
 | 
			
		||||
        if (StringUtils.isEmpty(scope)) {
 | 
			
		||||
            scope = config.getScope();
 | 
			
		||||
        }
 | 
			
		||||
        return scope;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected EdgeEventActionType getEdgeEventActionTypeByMsgType(String msgType) {
 | 
			
		||||
        EdgeEventActionType actionType;
 | 
			
		||||
        if (SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)) {
 | 
			
		||||
            actionType = EdgeEventActionType.TIMESERIES_UPDATED;
 | 
			
		||||
        } else if (DataConstants.ATTRIBUTES_UPDATED.equals(msgType)) {
 | 
			
		||||
            actionType = EdgeEventActionType.ATTRIBUTES_UPDATED;
 | 
			
		||||
        } else if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)) {
 | 
			
		||||
            actionType = EdgeEventActionType.POST_ATTRIBUTES;
 | 
			
		||||
        } else {
 | 
			
		||||
            actionType = EdgeEventActionType.ATTRIBUTES_DELETED;
 | 
			
		||||
        }
 | 
			
		||||
        return actionType;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected boolean isSupportedMsgType(String msgType) {
 | 
			
		||||
        return SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)
 | 
			
		||||
                || SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)
 | 
			
		||||
                || DataConstants.ATTRIBUTES_UPDATED.equals(msgType)
 | 
			
		||||
                || DataConstants.ATTRIBUTES_DELETED.equals(msgType)
 | 
			
		||||
                || DataConstants.TIMESERIES_UPDATED.equals(msgType)
 | 
			
		||||
                || DataConstants.ALARM.equals(msgType);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected boolean isSupportedOriginator(EntityType entityType) {
 | 
			
		||||
        switch (entityType) {
 | 
			
		||||
            case DEVICE:
 | 
			
		||||
            case ASSET:
 | 
			
		||||
            case ENTITY_VIEW:
 | 
			
		||||
            case DASHBOARD:
 | 
			
		||||
            case TENANT:
 | 
			
		||||
            case CUSTOMER:
 | 
			
		||||
            case EDGE:
 | 
			
		||||
                return true;
 | 
			
		||||
            default:
 | 
			
		||||
                return false;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,33 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2022 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.edge;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.rule.engine.api.NodeConfiguration;
 | 
			
		||||
import org.thingsboard.server.common.data.DataConstants;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
public class BaseTbMsgPushNodeConfiguration implements NodeConfiguration<BaseTbMsgPushNodeConfiguration> {
 | 
			
		||||
 | 
			
		||||
    private String scope;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public BaseTbMsgPushNodeConfiguration defaultConfiguration() {
 | 
			
		||||
        BaseTbMsgPushNodeConfiguration configuration = new BaseTbMsgPushNodeConfiguration();
 | 
			
		||||
        configuration.setScope(DataConstants.SERVER_SCOPE);
 | 
			
		||||
        return configuration;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -15,17 +15,19 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.edge;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbContext;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleChainType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
@RuleNode(
 | 
			
		||||
        type = ComponentType.ACTION,
 | 
			
		||||
@ -57,22 +59,37 @@ import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
        icon = "cloud_upload",
 | 
			
		||||
        ruleChainTypes = RuleChainType.EDGE
 | 
			
		||||
)
 | 
			
		||||
public class TbMsgPushToCloudNode implements TbNode {
 | 
			
		||||
public class TbMsgPushToCloudNode extends AbstractTbMsgPushNode<TbMsgPushToCloudNodeConfiguration, Object, Object> {
 | 
			
		||||
 | 
			
		||||
    private TbMsgPushToCloudNodeConfiguration config;
 | 
			
		||||
    // Implementation of this node is done on the Edge
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, TbMsgPushToCloudNodeConfiguration.class);
 | 
			
		||||
    Object buildEvent(TenantId tenantId, EdgeEventActionType edgeEventAction, UUID entityId, Object edgeEventType, JsonNode entityBody) {
 | 
			
		||||
        return null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        // Implementation of this node is done on the Edge
 | 
			
		||||
    Object getEventTypeByEntityType(EntityType entityType) {
 | 
			
		||||
        return null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void destroy() {
 | 
			
		||||
    Object getAlarmEventType() {
 | 
			
		||||
        return null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    String getIgnoredMessageSource() {
 | 
			
		||||
        return null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected Class<TbMsgPushToCloudNodeConfiguration> getConfigClazz() {
 | 
			
		||||
        return TbMsgPushToCloudNodeConfiguration.class;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    void processMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -16,13 +16,12 @@
 | 
			
		||||
package org.thingsboard.rule.engine.edge;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.rule.engine.api.NodeConfiguration;
 | 
			
		||||
import lombok.EqualsAndHashCode;
 | 
			
		||||
import org.thingsboard.server.common.data.DataConstants;
 | 
			
		||||
 | 
			
		||||
@EqualsAndHashCode(callSuper = true)
 | 
			
		||||
@Data
 | 
			
		||||
public class TbMsgPushToCloudNodeConfiguration implements NodeConfiguration<TbMsgPushToCloudNodeConfiguration> {
 | 
			
		||||
 | 
			
		||||
    private String scope;
 | 
			
		||||
public class TbMsgPushToCloudNodeConfiguration extends BaseTbMsgPushNodeConfiguration {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbMsgPushToCloudNodeConfiguration defaultConfiguration() {
 | 
			
		||||
 | 
			
		||||
@ -15,23 +15,13 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.edge;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.core.type.TypeReference;
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.google.common.util.concurrent.FutureCallback;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbContext;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNode;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.DataConstants;
 | 
			
		||||
import org.thingsboard.server.common.data.EdgeUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
 | 
			
		||||
@ -42,12 +32,7 @@ import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleChainType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.Nullable;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
 | 
			
		||||
@ -84,119 +69,13 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
 | 
			
		||||
        icon = "cloud_download",
 | 
			
		||||
        ruleChainTypes = RuleChainType.CORE
 | 
			
		||||
)
 | 
			
		||||
public class TbMsgPushToEdgeNode implements TbNode {
 | 
			
		||||
public class TbMsgPushToEdgeNode extends AbstractTbMsgPushNode<TbMsgPushToEdgeNodeConfiguration, EdgeEvent, EdgeEventType> {
 | 
			
		||||
 | 
			
		||||
    private TbMsgPushToEdgeNodeConfiguration config;
 | 
			
		||||
 | 
			
		||||
    private static final String SCOPE = "scope";
 | 
			
		||||
 | 
			
		||||
    private static final int DEFAULT_PAGE_SIZE = 1000;
 | 
			
		||||
    static final int DEFAULT_PAGE_SIZE = 100;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, TbMsgPushToEdgeNodeConfiguration.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        if (DataConstants.EDGE_MSG_SOURCE.equalsIgnoreCase(msg.getMetaData().getValue(DataConstants.MSG_SOURCE_KEY))) {
 | 
			
		||||
            log.debug("Ignoring msg from the cloud, msg [{}]", msg);
 | 
			
		||||
            ctx.ack(msg);
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        if (isSupportedOriginator(msg.getOriginator().getEntityType())) {
 | 
			
		||||
            if (isSupportedMsgType(msg.getType())) {
 | 
			
		||||
                processMsg(ctx, msg);
 | 
			
		||||
            } else {
 | 
			
		||||
                log.debug("Unsupported msg type {}", msg.getType());
 | 
			
		||||
                ctx.tellFailure(msg, new RuntimeException("Unsupported msg type '" + msg.getType() + "'"));
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            log.debug("Unsupported originator type {}", msg.getOriginator().getEntityType());
 | 
			
		||||
            ctx.tellFailure(msg, new RuntimeException("Unsupported originator type '" + msg.getOriginator().getEntityType() + "'"));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void processMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        if (EntityType.EDGE.equals(msg.getOriginator().getEntityType())) {
 | 
			
		||||
            EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx);
 | 
			
		||||
            if (edgeEvent != null) {
 | 
			
		||||
                EdgeId edgeId = new EdgeId(msg.getOriginator().getId());
 | 
			
		||||
                notifyEdge(ctx, msg, edgeEvent, edgeId);
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
 | 
			
		||||
            PageData<EdgeId> pageData;
 | 
			
		||||
            do {
 | 
			
		||||
                pageData = ctx.getEdgeService().findRelatedEdgeIdsByEntityId(ctx.getTenantId(), msg.getOriginator(), pageLink);
 | 
			
		||||
                if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
 | 
			
		||||
                    for (EdgeId edgeId : pageData.getData()) {
 | 
			
		||||
                        EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx);
 | 
			
		||||
                        if (edgeEvent == null) {
 | 
			
		||||
                            log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType());
 | 
			
		||||
                            ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'"));
 | 
			
		||||
                        } else {
 | 
			
		||||
                            notifyEdge(ctx, msg, edgeEvent, edgeId);
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                    if (pageData.hasNext()) {
 | 
			
		||||
                        pageLink = pageLink.nextPageLink();
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            } while (pageData != null && pageData.hasNext());
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void notifyEdge(TbContext ctx, TbMsg msg, EdgeEvent edgeEvent, EdgeId edgeId) {
 | 
			
		||||
        edgeEvent.setEdgeId(edgeId);
 | 
			
		||||
        ctx.getEdgeEventService().save(edgeEvent);
 | 
			
		||||
        ctx.tellNext(msg, SUCCESS);
 | 
			
		||||
        ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private EdgeEvent buildEdgeEvent(TbMsg msg, TbContext ctx) {
 | 
			
		||||
        String msgType = msg.getType();
 | 
			
		||||
        if (DataConstants.ALARM.equals(msgType)) {
 | 
			
		||||
            return buildEdgeEvent(ctx.getTenantId(), EdgeEventActionType.ADDED, getUUIDFromMsgData(msg), EdgeEventType.ALARM, null);
 | 
			
		||||
        } else {
 | 
			
		||||
            EdgeEventType edgeEventTypeByEntityType = EdgeUtils.getEdgeEventTypeByEntityType(msg.getOriginator().getEntityType());
 | 
			
		||||
            if (edgeEventTypeByEntityType == null) {
 | 
			
		||||
                return null;
 | 
			
		||||
            }
 | 
			
		||||
            EdgeEventActionType actionType = getEdgeEventActionTypeByMsgType(msgType);
 | 
			
		||||
            Map<String, Object> entityBody = new HashMap<>();
 | 
			
		||||
            Map<String, String> metadata = msg.getMetaData().getData();
 | 
			
		||||
            JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData());
 | 
			
		||||
            switch (actionType) {
 | 
			
		||||
                case ATTRIBUTES_UPDATED:
 | 
			
		||||
                case POST_ATTRIBUTES:
 | 
			
		||||
                    entityBody.put("kv", dataJson);
 | 
			
		||||
                    entityBody.put(SCOPE, getScope(metadata));
 | 
			
		||||
                    break;
 | 
			
		||||
                case ATTRIBUTES_DELETED:
 | 
			
		||||
                    List<String> keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() {
 | 
			
		||||
                    });
 | 
			
		||||
                    entityBody.put("keys", keys);
 | 
			
		||||
                    entityBody.put(SCOPE, getScope(metadata));
 | 
			
		||||
                    break;
 | 
			
		||||
                case TIMESERIES_UPDATED:
 | 
			
		||||
                    entityBody.put("data", dataJson);
 | 
			
		||||
                    entityBody.put("ts", metadata.get("ts"));
 | 
			
		||||
                    break;
 | 
			
		||||
            }
 | 
			
		||||
            return buildEdgeEvent(ctx.getTenantId(), actionType, msg.getOriginator().getId(), edgeEventTypeByEntityType, JacksonUtil.valueToTree(entityBody));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private String getScope(Map<String, String> metadata) {
 | 
			
		||||
        String scope = metadata.get(SCOPE);
 | 
			
		||||
        if (StringUtils.isEmpty(scope)) {
 | 
			
		||||
            scope = config.getScope();
 | 
			
		||||
        }
 | 
			
		||||
        return scope;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private EdgeEvent buildEdgeEvent(TenantId tenantId, EdgeEventActionType edgeEventAction, UUID entityId, EdgeEventType edgeEventType, JsonNode entityBody) {
 | 
			
		||||
    EdgeEvent buildEvent(TenantId tenantId, EdgeEventActionType edgeEventAction, UUID entityId,
 | 
			
		||||
                         EdgeEventType edgeEventType, JsonNode entityBody) {
 | 
			
		||||
        EdgeEvent edgeEvent = new EdgeEvent();
 | 
			
		||||
        edgeEvent.setTenantId(tenantId);
 | 
			
		||||
        edgeEvent.setAction(edgeEventAction);
 | 
			
		||||
@ -206,51 +85,75 @@ public class TbMsgPushToEdgeNode implements TbNode {
 | 
			
		||||
        return edgeEvent;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private UUID getUUIDFromMsgData(TbMsg msg) {
 | 
			
		||||
        JsonNode data = JacksonUtil.toJsonNode(msg.getData()).get("id");
 | 
			
		||||
        String id = JacksonUtil.convertValue(data.get("id"), String.class);
 | 
			
		||||
        return UUID.fromString(id);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private EdgeEventActionType getEdgeEventActionTypeByMsgType(String msgType) {
 | 
			
		||||
        EdgeEventActionType actionType;
 | 
			
		||||
        if (SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)) {
 | 
			
		||||
            actionType = EdgeEventActionType.TIMESERIES_UPDATED;
 | 
			
		||||
        } else if (DataConstants.ATTRIBUTES_UPDATED.equals(msgType)) {
 | 
			
		||||
            actionType = EdgeEventActionType.ATTRIBUTES_UPDATED;
 | 
			
		||||
        } else if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)) {
 | 
			
		||||
            actionType = EdgeEventActionType.POST_ATTRIBUTES;
 | 
			
		||||
        } else {
 | 
			
		||||
            actionType = EdgeEventActionType.ATTRIBUTES_DELETED;
 | 
			
		||||
        }
 | 
			
		||||
        return actionType;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean isSupportedOriginator(EntityType entityType) {
 | 
			
		||||
        switch (entityType) {
 | 
			
		||||
            case DEVICE:
 | 
			
		||||
            case ASSET:
 | 
			
		||||
            case ENTITY_VIEW:
 | 
			
		||||
            case DASHBOARD:
 | 
			
		||||
            case TENANT:
 | 
			
		||||
            case CUSTOMER:
 | 
			
		||||
            case EDGE:
 | 
			
		||||
                return true;
 | 
			
		||||
            default:
 | 
			
		||||
                return false;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean isSupportedMsgType(String msgType) {
 | 
			
		||||
        return SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)
 | 
			
		||||
                || SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)
 | 
			
		||||
                || DataConstants.ATTRIBUTES_UPDATED.equals(msgType)
 | 
			
		||||
                || DataConstants.ATTRIBUTES_DELETED.equals(msgType)
 | 
			
		||||
                || DataConstants.ALARM.equals(msgType);
 | 
			
		||||
    @Override
 | 
			
		||||
    EdgeEventType getEventTypeByEntityType(EntityType entityType) {
 | 
			
		||||
        return EdgeUtils.getEdgeEventTypeByEntityType(entityType);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void destroy() {
 | 
			
		||||
    EdgeEventType getAlarmEventType() {
 | 
			
		||||
        return EdgeEventType.ALARM;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    String getIgnoredMessageSource() {
 | 
			
		||||
        return DataConstants.EDGE_MSG_SOURCE;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected Class<TbMsgPushToEdgeNodeConfiguration> getConfigClazz() {
 | 
			
		||||
        return TbMsgPushToEdgeNodeConfiguration.class;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected void processMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        if (EntityType.EDGE.equals(msg.getOriginator().getEntityType())) {
 | 
			
		||||
            EdgeEvent edgeEvent = buildEvent(msg, ctx);
 | 
			
		||||
            if (edgeEvent != null) {
 | 
			
		||||
                EdgeId edgeId = new EdgeId(msg.getOriginator().getId());
 | 
			
		||||
                notifyEdge(ctx, msg, edgeEvent, edgeId);
 | 
			
		||||
            } else {
 | 
			
		||||
                tellFailure(ctx, msg);
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
 | 
			
		||||
            PageData<EdgeId> pageData;
 | 
			
		||||
            boolean edgeNotified = false;
 | 
			
		||||
            do {
 | 
			
		||||
                pageData = ctx.getEdgeService().findRelatedEdgeIdsByEntityId(ctx.getTenantId(), msg.getOriginator(), pageLink);
 | 
			
		||||
                if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
 | 
			
		||||
                    for (EdgeId edgeId : pageData.getData()) {
 | 
			
		||||
                        EdgeEvent edgeEvent = buildEvent(msg, ctx);
 | 
			
		||||
                        if (edgeEvent != null) {
 | 
			
		||||
                            notifyEdge(ctx, msg, edgeEvent, edgeId);
 | 
			
		||||
                            edgeNotified = true;
 | 
			
		||||
                        } else {
 | 
			
		||||
                            tellFailure(ctx, msg);
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                    if (pageData.hasNext()) {
 | 
			
		||||
                        pageLink = pageLink.nextPageLink();
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            } while (pageData != null && pageData.hasNext());
 | 
			
		||||
 | 
			
		||||
            if (!edgeNotified) {
 | 
			
		||||
                // ack in case no edges are related to provided entity
 | 
			
		||||
                ctx.ack(msg);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void tellFailure(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        String errMsg = String.format("Edge event type is null. Entity Type %s", msg.getOriginator().getEntityType());
 | 
			
		||||
        log.warn(errMsg);
 | 
			
		||||
        ctx.tellFailure(msg, new RuntimeException(errMsg));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void notifyEdge(TbContext ctx, TbMsg msg, EdgeEvent edgeEvent, EdgeId edgeId) {
 | 
			
		||||
        edgeEvent.setEdgeId(edgeId);
 | 
			
		||||
        ctx.getEdgeEventService().save(edgeEvent);
 | 
			
		||||
        ctx.tellNext(msg, SUCCESS);
 | 
			
		||||
        ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -16,13 +16,12 @@
 | 
			
		||||
package org.thingsboard.rule.engine.edge;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.rule.engine.api.NodeConfiguration;
 | 
			
		||||
import lombok.EqualsAndHashCode;
 | 
			
		||||
import org.thingsboard.server.common.data.DataConstants;
 | 
			
		||||
 | 
			
		||||
@EqualsAndHashCode(callSuper = true)
 | 
			
		||||
@Data
 | 
			
		||||
public class TbMsgPushToEdgeNodeConfiguration implements NodeConfiguration<TbMsgPushToEdgeNodeConfiguration> {
 | 
			
		||||
 | 
			
		||||
    private String scope;
 | 
			
		||||
public class TbMsgPushToEdgeNodeConfiguration extends BaseTbMsgPushNodeConfiguration {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbMsgPushToEdgeNodeConfiguration defaultConfiguration() {
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,76 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2022 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.edge;
 | 
			
		||||
 | 
			
		||||
import org.junit.Before;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.junit.runner.RunWith;
 | 
			
		||||
import org.mockito.Mock;
 | 
			
		||||
import org.mockito.Mockito;
 | 
			
		||||
import org.mockito.junit.MockitoJUnitRunner;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbContext;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgDataType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
 | 
			
		||||
import org.thingsboard.server.dao.edge.EdgeService;
 | 
			
		||||
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
import static org.mockito.Mockito.verify;
 | 
			
		||||
 | 
			
		||||
@RunWith(MockitoJUnitRunner.class)
 | 
			
		||||
public class TbMsgPushToEdgeNodeTest {
 | 
			
		||||
 | 
			
		||||
    TbMsgPushToEdgeNode node;
 | 
			
		||||
 | 
			
		||||
    private final TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
 | 
			
		||||
    private final DeviceId deviceId = new DeviceId(UUID.randomUUID());
 | 
			
		||||
 | 
			
		||||
    @Mock
 | 
			
		||||
    private TbContext ctx;
 | 
			
		||||
 | 
			
		||||
    @Mock
 | 
			
		||||
    private EdgeService edgeService;
 | 
			
		||||
 | 
			
		||||
    @Before
 | 
			
		||||
    public void setUp() throws TbNodeException {
 | 
			
		||||
        node = new TbMsgPushToEdgeNode();
 | 
			
		||||
        TbMsgPushToEdgeNodeConfiguration config = new TbMsgPushToEdgeNodeConfiguration().defaultConfiguration();
 | 
			
		||||
        node.init(ctx, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void ackMsgInCaseNoEdgeRelated() {
 | 
			
		||||
        Mockito.when(ctx.getTenantId()).thenReturn(tenantId);
 | 
			
		||||
        Mockito.when(ctx.getEdgeService()).thenReturn(edgeService);
 | 
			
		||||
        Mockito.when(edgeService.findRelatedEdgeIdsByEntityId(tenantId, deviceId, new PageLink(TbMsgPushToEdgeNode.DEFAULT_PAGE_SIZE))).thenReturn(new PageData<>());
 | 
			
		||||
 | 
			
		||||
        TbMsg msg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, new TbMsgMetaData(),
 | 
			
		||||
                TbMsgDataType.JSON, "{}", null, null);
 | 
			
		||||
 | 
			
		||||
        node.onMsg(ctx, msg);
 | 
			
		||||
 | 
			
		||||
        verify(ctx).ack(msg);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user