Merge pull request #6437 from volodymyr-babak/fix-push-to-edge-node
[3.4] Fixes for bug - push to edge node generates timeout message
This commit is contained in:
commit
65be4cd16c
@ -1,48 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
|
||||
public final class EdgeEventUtils {
|
||||
|
||||
private EdgeEventUtils() {
|
||||
}
|
||||
|
||||
public static EdgeEvent constructEdgeEvent(TenantId tenantId,
|
||||
EdgeId edgeId,
|
||||
EdgeEventType type,
|
||||
EdgeEventActionType action,
|
||||
EntityId entityId,
|
||||
JsonNode body) {
|
||||
EdgeEvent edgeEvent = new EdgeEvent();
|
||||
edgeEvent.setTenantId(tenantId);
|
||||
edgeEvent.setEdgeId(edgeId);
|
||||
edgeEvent.setType(type);
|
||||
edgeEvent.setAction(action);
|
||||
if (entityId != null) {
|
||||
edgeEvent.setEntityId(entityId.getId());
|
||||
}
|
||||
edgeEvent.setBody(body);
|
||||
return edgeEvent;
|
||||
}
|
||||
}
|
||||
@ -15,7 +15,6 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import io.grpc.Server;
|
||||
@ -71,7 +70,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
||||
private final ConcurrentMap<EdgeId, Lock> sessionNewEventsLocks = new ConcurrentHashMap<>();
|
||||
private final Map<EdgeId, Boolean> sessionNewEvents = new HashMap<>();
|
||||
private final ConcurrentMap<EdgeId, ScheduledFuture<?>> sessionEdgeEventChecks = new ConcurrentHashMap<>();
|
||||
private static final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
@Value("${edges.rpc.port}")
|
||||
private int rpcPort;
|
||||
@ -159,7 +157,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
||||
|
||||
@Override
|
||||
public StreamObserver<RequestMsg> handleMsgs(StreamObserver<ResponseMsg> outputStream) {
|
||||
return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper, sendDownlinkExecutorService).getInputStream();
|
||||
return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, sendDownlinkExecutorService).getInputStream();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -16,7 +16,6 @@
|
||||
package org.thingsboard.server.service.edge.rpc;
|
||||
|
||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
@ -96,7 +95,6 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
private final UUID sessionId;
|
||||
private final BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener;
|
||||
private final Consumer<EdgeId> sessionCloseListener;
|
||||
private final ObjectMapper mapper;
|
||||
|
||||
private final EdgeSessionState sessionState = new EdgeSessionState();
|
||||
|
||||
@ -112,13 +110,12 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
private ScheduledExecutorService sendDownlinkExecutorService;
|
||||
|
||||
EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream, BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener,
|
||||
Consumer<EdgeId> sessionCloseListener, ObjectMapper mapper, ScheduledExecutorService sendDownlinkExecutorService) {
|
||||
Consumer<EdgeId> sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService) {
|
||||
this.sessionId = UUID.randomUUID();
|
||||
this.ctx = ctx;
|
||||
this.outputStream = outputStream;
|
||||
this.sessionOpenListener = sessionOpenListener;
|
||||
this.sessionCloseListener = sessionCloseListener;
|
||||
this.mapper = mapper;
|
||||
this.sendDownlinkExecutorService = sendDownlinkExecutorService;
|
||||
initInputStream();
|
||||
}
|
||||
@ -402,11 +399,11 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
Runnable sendDownlinkMsgsTask = () -> {
|
||||
try {
|
||||
if (isConnected() && sessionState.getPendingMsgsMap().values().size() > 0) {
|
||||
if (!firstRun) {
|
||||
log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, sessionState.getPendingMsgsMap().values());
|
||||
}
|
||||
log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, sessionState.getPendingMsgsMap().values().size());
|
||||
List<DownlinkMsg> copy = new ArrayList<>(sessionState.getPendingMsgsMap().values());
|
||||
if (!firstRun) {
|
||||
log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, copy);
|
||||
}
|
||||
log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, copy.size());
|
||||
for (DownlinkMsg downlinkMsg : copy) {
|
||||
sendDownlinkMsg(ResponseMsg.newBuilder()
|
||||
.setDownlinkMsg(downlinkMsg)
|
||||
|
||||
@ -26,6 +26,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.text.WordUtils;
|
||||
import org.thingsboard.server.common.data.AdminSettings;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
@ -35,7 +36,6 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.dao.settings.AdminSettingsService;
|
||||
import org.thingsboard.server.service.edge.rpc.EdgeEventUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@ -80,19 +80,19 @@ public class AdminSettingsEdgeEventFetcher implements EdgeEventFetcher {
|
||||
List<EdgeEvent> result = new ArrayList<>();
|
||||
|
||||
AdminSettings systemMailSettings = adminSettingsService.findAdminSettingsByKey(TenantId.SYS_TENANT_ID, "mail");
|
||||
result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
||||
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
||||
EdgeEventActionType.UPDATED, null, mapper.valueToTree(systemMailSettings)));
|
||||
|
||||
AdminSettings tenantMailSettings = convertToTenantAdminSettings(systemMailSettings.getKey(), (ObjectNode) systemMailSettings.getJsonValue());
|
||||
result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
||||
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
||||
EdgeEventActionType.UPDATED, null, mapper.valueToTree(tenantMailSettings)));
|
||||
|
||||
AdminSettings systemMailTemplates = loadMailTemplates();
|
||||
result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
||||
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
||||
EdgeEventActionType.UPDATED, null, mapper.valueToTree(systemMailTemplates)));
|
||||
|
||||
AdminSettings tenantMailTemplates = convertToTenantAdminSettings(systemMailTemplates.getKey(), (ObjectNode) systemMailTemplates.getJsonValue());
|
||||
result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
||||
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
||||
EdgeEventActionType.UPDATED, null, mapper.valueToTree(tenantMailTemplates)));
|
||||
|
||||
// @voba - returns PageData object to be in sync with other fetchers
|
||||
|
||||
@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc.fetch;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
@ -26,7 +27,6 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.dao.asset.AssetService;
|
||||
import org.thingsboard.server.service.edge.rpc.EdgeEventUtils;
|
||||
|
||||
@AllArgsConstructor
|
||||
@Slf4j
|
||||
@ -41,7 +41,7 @@ public class AssetsEdgeEventFetcher extends BasePageableEdgeEventFetcher<Asset>
|
||||
|
||||
@Override
|
||||
EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, Asset asset) {
|
||||
return EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ASSET,
|
||||
return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ASSET,
|
||||
EdgeEventActionType.ADDED, asset.getId(), null);
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc.fetch;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.User;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
@ -26,7 +27,6 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.dao.user.UserService;
|
||||
import org.thingsboard.server.service.edge.rpc.EdgeEventUtils;
|
||||
|
||||
@Slf4j
|
||||
@AllArgsConstructor
|
||||
@ -41,7 +41,7 @@ public abstract class BaseUsersEdgeEventFetcher extends BasePageableEdgeEventFet
|
||||
|
||||
@Override
|
||||
EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, User user) {
|
||||
return EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER,
|
||||
return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER,
|
||||
EdgeEventActionType.ADDED, user.getId(), null);
|
||||
}
|
||||
|
||||
|
||||
@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc.fetch;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
@ -26,7 +27,6 @@ import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.widget.WidgetsBundle;
|
||||
import org.thingsboard.server.dao.widget.WidgetsBundleService;
|
||||
import org.thingsboard.server.service.edge.rpc.EdgeEventUtils;
|
||||
|
||||
@Slf4j
|
||||
@AllArgsConstructor
|
||||
@ -41,7 +41,7 @@ public abstract class BaseWidgetsBundlesEdgeEventFetcher extends BasePageableEdg
|
||||
|
||||
@Override
|
||||
EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, WidgetsBundle widgetsBundle) {
|
||||
return EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGETS_BUNDLE,
|
||||
return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGETS_BUNDLE,
|
||||
EdgeEventActionType.ADDED, widgetsBundle.getId(), null);
|
||||
}
|
||||
|
||||
|
||||
@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc.fetch;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
@ -24,7 +25,6 @@ import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.service.edge.rpc.EdgeEventUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@ -41,7 +41,7 @@ public class CustomerEdgeEventFetcher implements EdgeEventFetcher {
|
||||
@Override
|
||||
public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, Edge edge, PageLink pageLink) {
|
||||
List<EdgeEvent> result = new ArrayList<>();
|
||||
result.add(EdgeEventUtils.constructEdgeEvent(edge.getTenantId(), edge.getId(),
|
||||
result.add(EdgeUtils.constructEdgeEvent(edge.getTenantId(), edge.getId(),
|
||||
EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, edge.getCustomerId(), null));
|
||||
// @voba - returns PageData object to be in sync with other fetchers
|
||||
return new PageData<>(result, 1, result.size(), false);
|
||||
|
||||
@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc.fetch;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.DashboardInfo;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
@ -26,7 +27,6 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.dao.dashboard.DashboardService;
|
||||
import org.thingsboard.server.service.edge.rpc.EdgeEventUtils;
|
||||
|
||||
@AllArgsConstructor
|
||||
@Slf4j
|
||||
@ -41,7 +41,7 @@ public class DashboardsEdgeEventFetcher extends BasePageableEdgeEventFetcher<Das
|
||||
|
||||
@Override
|
||||
EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, DashboardInfo dashboardInfo) {
|
||||
return EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.DASHBOARD,
|
||||
return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.DASHBOARD,
|
||||
EdgeEventActionType.ADDED, dashboardInfo.getId(), null);
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc.fetch;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
@ -26,7 +27,6 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.dao.device.DeviceProfileService;
|
||||
import org.thingsboard.server.service.edge.rpc.EdgeEventUtils;
|
||||
|
||||
@AllArgsConstructor
|
||||
@Slf4j
|
||||
@ -41,7 +41,7 @@ public class DeviceProfilesEdgeEventFetcher extends BasePageableEdgeEventFetcher
|
||||
|
||||
@Override
|
||||
EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, DeviceProfile deviceProfile) {
|
||||
return EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE_PROFILE,
|
||||
return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE_PROFILE,
|
||||
EdgeEventActionType.ADDED, deviceProfile.getId(), null);
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc.fetch;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
@ -26,7 +27,6 @@ import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.rule.RuleChain;
|
||||
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||
import org.thingsboard.server.service.edge.rpc.EdgeEventUtils;
|
||||
|
||||
@Slf4j
|
||||
@AllArgsConstructor
|
||||
@ -41,7 +41,7 @@ public class RuleChainsEdgeEventFetcher extends BasePageableEdgeEventFetcher<Rul
|
||||
|
||||
@Override
|
||||
EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, RuleChain ruleChain) {
|
||||
return EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN,
|
||||
return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN,
|
||||
EdgeEventActionType.ADDED, ruleChain.getId(), null);
|
||||
}
|
||||
}
|
||||
|
||||
@ -71,7 +71,6 @@ import org.thingsboard.server.gen.edge.v1.RelationRequestMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.RuleChainMetadataRequestMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UserCredentialsRequestMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.WidgetBundleTypesRequestMsg;
|
||||
import org.thingsboard.server.service.edge.rpc.EdgeEventUtils;
|
||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
||||
import org.thingsboard.server.cluster.TbClusterService;
|
||||
|
||||
@ -404,7 +403,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
|
||||
log.trace("Pushing edge event to edge queue. tenantId [{}], edgeId [{}], type [{}], action[{}], entityId [{}], body [{}]",
|
||||
tenantId, edgeId, type, action, entityId, body);
|
||||
|
||||
EdgeEvent edgeEvent = EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, type, action, entityId, body);
|
||||
EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, type, action, entityId, body);
|
||||
|
||||
edgeEventService.save(edgeEvent);
|
||||
tbClusterService.onEdgeEventUpdate(tenantId, edgeId);
|
||||
|
||||
@ -15,8 +15,14 @@
|
||||
*/
|
||||
package org.thingsboard.server.common.data;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
@ -63,4 +69,22 @@ public final class EdgeUtils {
|
||||
public static int nextPositiveInt() {
|
||||
return ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
public static EdgeEvent constructEdgeEvent(TenantId tenantId,
|
||||
EdgeId edgeId,
|
||||
EdgeEventType type,
|
||||
EdgeEventActionType action,
|
||||
EntityId entityId,
|
||||
JsonNode body) {
|
||||
EdgeEvent edgeEvent = new EdgeEvent();
|
||||
edgeEvent.setTenantId(tenantId);
|
||||
edgeEvent.setEdgeId(edgeId);
|
||||
edgeEvent.setType(type);
|
||||
edgeEvent.setAction(action);
|
||||
if (entityId != null) {
|
||||
edgeEvent.setEntityId(entityId.getId());
|
||||
}
|
||||
edgeEvent.setBody(body);
|
||||
return edgeEvent;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,178 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.edge;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNode;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
@Slf4j
|
||||
public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfiguration, S, U> implements TbNode {
|
||||
|
||||
protected T config;
|
||||
|
||||
private static final String SCOPE = "scope";
|
||||
|
||||
@Override
|
||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||
this.config = TbNodeUtils.convert(configuration, getConfigClazz());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
if (getIgnoredMessageSource().equalsIgnoreCase(msg.getMetaData().getValue(DataConstants.MSG_SOURCE_KEY))) {
|
||||
log.debug("Ignoring msg from the {}, msg [{}]", getIgnoredMessageSource(), msg);
|
||||
ctx.ack(msg);
|
||||
return;
|
||||
}
|
||||
if (isSupportedOriginator(msg.getOriginator().getEntityType())) {
|
||||
if (isSupportedMsgType(msg.getType())) {
|
||||
processMsg(ctx, msg);
|
||||
} else {
|
||||
String errMsg = String.format("Unsupported msg type %s", msg.getType());
|
||||
log.debug(errMsg);
|
||||
ctx.tellFailure(msg, new RuntimeException(errMsg));
|
||||
}
|
||||
} else {
|
||||
String errMsg = String.format("Unsupported originator type %s", msg.getOriginator().getEntityType());
|
||||
log.debug(errMsg);
|
||||
ctx.tellFailure(msg, new RuntimeException(errMsg));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
}
|
||||
|
||||
protected S buildEvent(TbMsg msg, TbContext ctx) {
|
||||
String msgType = msg.getType();
|
||||
if (DataConstants.ALARM.equals(msgType)) {
|
||||
return buildEvent(ctx.getTenantId(), EdgeEventActionType.ADDED, getUUIDFromMsgData(msg), getAlarmEventType(), null);
|
||||
} else {
|
||||
U eventTypeByEntityType = getEventTypeByEntityType(msg.getOriginator().getEntityType());
|
||||
if (eventTypeByEntityType == null) {
|
||||
return null;
|
||||
}
|
||||
EdgeEventActionType actionType = getEdgeEventActionTypeByMsgType(msgType);
|
||||
Map<String, Object> entityBody = new HashMap<>();
|
||||
Map<String, String> metadata = msg.getMetaData().getData();
|
||||
JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData());
|
||||
switch (actionType) {
|
||||
case ATTRIBUTES_UPDATED:
|
||||
case POST_ATTRIBUTES:
|
||||
entityBody.put("kv", dataJson);
|
||||
entityBody.put(SCOPE, getScope(metadata));
|
||||
if (EdgeEventActionType.POST_ATTRIBUTES.equals(actionType)) {
|
||||
entityBody.put("isPostAttributes", true);
|
||||
}
|
||||
break;
|
||||
case ATTRIBUTES_DELETED:
|
||||
List<String> keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() {});
|
||||
entityBody.put("keys", keys);
|
||||
entityBody.put(SCOPE, getScope(metadata));
|
||||
break;
|
||||
case TIMESERIES_UPDATED:
|
||||
entityBody.put("data", dataJson);
|
||||
entityBody.put("ts", msg.getMetaDataTs());
|
||||
break;
|
||||
}
|
||||
return buildEvent(ctx.getTenantId(), actionType, msg.getOriginator().getId(), eventTypeByEntityType, JacksonUtil.valueToTree(entityBody));
|
||||
}
|
||||
}
|
||||
|
||||
abstract S buildEvent(TenantId tenantId, EdgeEventActionType eventAction, UUID entityId, U eventType, JsonNode entityBody);
|
||||
|
||||
abstract U getEventTypeByEntityType(EntityType entityType);
|
||||
|
||||
abstract U getAlarmEventType();
|
||||
|
||||
abstract String getIgnoredMessageSource();
|
||||
|
||||
abstract protected Class<T> getConfigClazz();
|
||||
|
||||
abstract void processMsg(TbContext ctx, TbMsg msg);
|
||||
|
||||
protected UUID getUUIDFromMsgData(TbMsg msg) {
|
||||
JsonNode data = JacksonUtil.toJsonNode(msg.getData()).get("id");
|
||||
String id = JacksonUtil.convertValue(data.get("id"), String.class);
|
||||
return UUID.fromString(id);
|
||||
}
|
||||
|
||||
protected String getScope(Map<String, String> metadata) {
|
||||
String scope = metadata.get(SCOPE);
|
||||
if (StringUtils.isEmpty(scope)) {
|
||||
scope = config.getScope();
|
||||
}
|
||||
return scope;
|
||||
}
|
||||
|
||||
protected EdgeEventActionType getEdgeEventActionTypeByMsgType(String msgType) {
|
||||
EdgeEventActionType actionType;
|
||||
if (SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)
|
||||
|| DataConstants.TIMESERIES_UPDATED.equals(msgType)) {
|
||||
actionType = EdgeEventActionType.TIMESERIES_UPDATED;
|
||||
} else if (DataConstants.ATTRIBUTES_UPDATED.equals(msgType)) {
|
||||
actionType = EdgeEventActionType.ATTRIBUTES_UPDATED;
|
||||
} else if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)) {
|
||||
actionType = EdgeEventActionType.POST_ATTRIBUTES;
|
||||
} else {
|
||||
actionType = EdgeEventActionType.ATTRIBUTES_DELETED;
|
||||
}
|
||||
return actionType;
|
||||
}
|
||||
|
||||
protected boolean isSupportedMsgType(String msgType) {
|
||||
return SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)
|
||||
|| SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)
|
||||
|| DataConstants.ATTRIBUTES_UPDATED.equals(msgType)
|
||||
|| DataConstants.ATTRIBUTES_DELETED.equals(msgType)
|
||||
|| DataConstants.TIMESERIES_UPDATED.equals(msgType)
|
||||
|| DataConstants.ALARM.equals(msgType);
|
||||
}
|
||||
|
||||
protected boolean isSupportedOriginator(EntityType entityType) {
|
||||
switch (entityType) {
|
||||
case DEVICE:
|
||||
case ASSET:
|
||||
case ENTITY_VIEW:
|
||||
case DASHBOARD:
|
||||
case TENANT:
|
||||
case CUSTOMER:
|
||||
case EDGE:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,33 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.edge;
|
||||
|
||||
import lombok.Data;
|
||||
import org.thingsboard.rule.engine.api.NodeConfiguration;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
|
||||
@Data
|
||||
public class BaseTbMsgPushNodeConfiguration implements NodeConfiguration<BaseTbMsgPushNodeConfiguration> {
|
||||
|
||||
private String scope;
|
||||
|
||||
@Override
|
||||
public BaseTbMsgPushNodeConfiguration defaultConfiguration() {
|
||||
BaseTbMsgPushNodeConfiguration configuration = new BaseTbMsgPushNodeConfiguration();
|
||||
configuration.setScope(DataConstants.SERVER_SCOPE);
|
||||
return configuration;
|
||||
}
|
||||
}
|
||||
@ -15,17 +15,19 @@
|
||||
*/
|
||||
package org.thingsboard.rule.engine.edge;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.rule.engine.api.RuleNode;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNode;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainType;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Slf4j
|
||||
@RuleNode(
|
||||
type = ComponentType.ACTION,
|
||||
@ -57,22 +59,37 @@ import org.thingsboard.server.common.msg.TbMsg;
|
||||
icon = "cloud_upload",
|
||||
ruleChainTypes = RuleChainType.EDGE
|
||||
)
|
||||
public class TbMsgPushToCloudNode implements TbNode {
|
||||
public class TbMsgPushToCloudNode extends AbstractTbMsgPushNode<TbMsgPushToCloudNodeConfiguration, Object, Object> {
|
||||
|
||||
private TbMsgPushToCloudNodeConfiguration config;
|
||||
|
||||
@Override
|
||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||
this.config = TbNodeUtils.convert(configuration, TbMsgPushToCloudNodeConfiguration.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
// Implementation of this node is done on the Edge
|
||||
|
||||
@Override
|
||||
Object buildEvent(TenantId tenantId, EdgeEventActionType eventAction, UUID entityId, Object eventType, JsonNode entityBody) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
Object getEventTypeByEntityType(EntityType entityType) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
Object getAlarmEventType() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
String getIgnoredMessageSource() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<TbMsgPushToCloudNodeConfiguration> getConfigClazz() {
|
||||
return TbMsgPushToCloudNodeConfiguration.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
void processMsg(TbContext ctx, TbMsg msg) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -16,13 +16,12 @@
|
||||
package org.thingsboard.rule.engine.edge;
|
||||
|
||||
import lombok.Data;
|
||||
import org.thingsboard.rule.engine.api.NodeConfiguration;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Data
|
||||
public class TbMsgPushToCloudNodeConfiguration implements NodeConfiguration<TbMsgPushToCloudNodeConfiguration> {
|
||||
|
||||
private String scope;
|
||||
public class TbMsgPushToCloudNodeConfiguration extends BaseTbMsgPushNodeConfiguration {
|
||||
|
||||
@Override
|
||||
public TbMsgPushToCloudNodeConfiguration defaultConfiguration() {
|
||||
|
||||
@ -15,23 +15,13 @@
|
||||
*/
|
||||
package org.thingsboard.rule.engine.edge;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.rule.engine.api.RuleNode;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNode;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
@ -42,12 +32,7 @@ import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainType;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
|
||||
@ -84,59 +69,66 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
|
||||
icon = "cloud_download",
|
||||
ruleChainTypes = RuleChainType.CORE
|
||||
)
|
||||
public class TbMsgPushToEdgeNode implements TbNode {
|
||||
public class TbMsgPushToEdgeNode extends AbstractTbMsgPushNode<TbMsgPushToEdgeNodeConfiguration, EdgeEvent, EdgeEventType> {
|
||||
|
||||
private TbMsgPushToEdgeNodeConfiguration config;
|
||||
|
||||
private static final String SCOPE = "scope";
|
||||
|
||||
private static final int DEFAULT_PAGE_SIZE = 1000;
|
||||
static final int DEFAULT_PAGE_SIZE = 100;
|
||||
|
||||
@Override
|
||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||
this.config = TbNodeUtils.convert(configuration, TbMsgPushToEdgeNodeConfiguration.class);
|
||||
EdgeEvent buildEvent(TenantId tenantId, EdgeEventActionType eventAction, UUID entityId,
|
||||
EdgeEventType eventType, JsonNode entityBody) {
|
||||
EdgeEvent edgeEvent = new EdgeEvent();
|
||||
edgeEvent.setTenantId(tenantId);
|
||||
edgeEvent.setAction(eventAction);
|
||||
edgeEvent.setEntityId(entityId);
|
||||
edgeEvent.setType(eventType);
|
||||
edgeEvent.setBody(entityBody);
|
||||
return edgeEvent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
if (DataConstants.EDGE_MSG_SOURCE.equalsIgnoreCase(msg.getMetaData().getValue(DataConstants.MSG_SOURCE_KEY))) {
|
||||
log.debug("Ignoring msg from the cloud, msg [{}]", msg);
|
||||
ctx.ack(msg);
|
||||
return;
|
||||
}
|
||||
if (isSupportedOriginator(msg.getOriginator().getEntityType())) {
|
||||
if (isSupportedMsgType(msg.getType())) {
|
||||
processMsg(ctx, msg);
|
||||
} else {
|
||||
log.debug("Unsupported msg type {}", msg.getType());
|
||||
ctx.tellFailure(msg, new RuntimeException("Unsupported msg type '" + msg.getType() + "'"));
|
||||
}
|
||||
} else {
|
||||
log.debug("Unsupported originator type {}", msg.getOriginator().getEntityType());
|
||||
ctx.tellFailure(msg, new RuntimeException("Unsupported originator type '" + msg.getOriginator().getEntityType() + "'"));
|
||||
}
|
||||
EdgeEventType getEventTypeByEntityType(EntityType entityType) {
|
||||
return EdgeUtils.getEdgeEventTypeByEntityType(entityType);
|
||||
}
|
||||
|
||||
private void processMsg(TbContext ctx, TbMsg msg) {
|
||||
@Override
|
||||
EdgeEventType getAlarmEventType() {
|
||||
return EdgeEventType.ALARM;
|
||||
}
|
||||
|
||||
@Override
|
||||
String getIgnoredMessageSource() {
|
||||
return DataConstants.EDGE_MSG_SOURCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<TbMsgPushToEdgeNodeConfiguration> getConfigClazz() {
|
||||
return TbMsgPushToEdgeNodeConfiguration.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processMsg(TbContext ctx, TbMsg msg) {
|
||||
if (EntityType.EDGE.equals(msg.getOriginator().getEntityType())) {
|
||||
EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx);
|
||||
EdgeEvent edgeEvent = buildEvent(msg, ctx);
|
||||
if (edgeEvent != null) {
|
||||
EdgeId edgeId = new EdgeId(msg.getOriginator().getId());
|
||||
notifyEdge(ctx, msg, edgeEvent, edgeId);
|
||||
} else {
|
||||
tellFailure(ctx, msg);
|
||||
}
|
||||
} else {
|
||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
||||
PageData<EdgeId> pageData;
|
||||
boolean edgeNotified = false;
|
||||
do {
|
||||
pageData = ctx.getEdgeService().findRelatedEdgeIdsByEntityId(ctx.getTenantId(), msg.getOriginator(), pageLink);
|
||||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
||||
for (EdgeId edgeId : pageData.getData()) {
|
||||
EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx);
|
||||
if (edgeEvent == null) {
|
||||
log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType());
|
||||
ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'"));
|
||||
} else {
|
||||
EdgeEvent edgeEvent = buildEvent(msg, ctx);
|
||||
if (edgeEvent != null) {
|
||||
notifyEdge(ctx, msg, edgeEvent, edgeId);
|
||||
edgeNotified = true;
|
||||
} else {
|
||||
tellFailure(ctx, msg);
|
||||
}
|
||||
}
|
||||
if (pageData.hasNext()) {
|
||||
@ -144,8 +136,19 @@ public class TbMsgPushToEdgeNode implements TbNode {
|
||||
}
|
||||
}
|
||||
} while (pageData != null && pageData.hasNext());
|
||||
|
||||
if (!edgeNotified) {
|
||||
// ack in case no edges are related to provided entity
|
||||
ctx.ack(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void tellFailure(TbContext ctx, TbMsg msg) {
|
||||
String errMsg = String.format("Edge event type is null. Entity Type %s", msg.getOriginator().getEntityType());
|
||||
log.warn(errMsg);
|
||||
ctx.tellFailure(msg, new RuntimeException(errMsg));
|
||||
}
|
||||
|
||||
private void notifyEdge(TbContext ctx, TbMsg msg, EdgeEvent edgeEvent, EdgeId edgeId) {
|
||||
edgeEvent.setEdgeId(edgeId);
|
||||
@ -154,103 +157,4 @@ public class TbMsgPushToEdgeNode implements TbNode {
|
||||
ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId);
|
||||
}
|
||||
|
||||
private EdgeEvent buildEdgeEvent(TbMsg msg, TbContext ctx) {
|
||||
String msgType = msg.getType();
|
||||
if (DataConstants.ALARM.equals(msgType)) {
|
||||
return buildEdgeEvent(ctx.getTenantId(), EdgeEventActionType.ADDED, getUUIDFromMsgData(msg), EdgeEventType.ALARM, null);
|
||||
} else {
|
||||
EdgeEventType edgeEventTypeByEntityType = EdgeUtils.getEdgeEventTypeByEntityType(msg.getOriginator().getEntityType());
|
||||
if (edgeEventTypeByEntityType == null) {
|
||||
return null;
|
||||
}
|
||||
EdgeEventActionType actionType = getEdgeEventActionTypeByMsgType(msgType);
|
||||
Map<String, Object> entityBody = new HashMap<>();
|
||||
Map<String, String> metadata = msg.getMetaData().getData();
|
||||
JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData());
|
||||
switch (actionType) {
|
||||
case ATTRIBUTES_UPDATED:
|
||||
case POST_ATTRIBUTES:
|
||||
entityBody.put("kv", dataJson);
|
||||
entityBody.put(SCOPE, getScope(metadata));
|
||||
break;
|
||||
case ATTRIBUTES_DELETED:
|
||||
List<String> keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() {
|
||||
});
|
||||
entityBody.put("keys", keys);
|
||||
entityBody.put(SCOPE, getScope(metadata));
|
||||
break;
|
||||
case TIMESERIES_UPDATED:
|
||||
entityBody.put("data", dataJson);
|
||||
entityBody.put("ts", metadata.get("ts"));
|
||||
break;
|
||||
}
|
||||
return buildEdgeEvent(ctx.getTenantId(), actionType, msg.getOriginator().getId(), edgeEventTypeByEntityType, JacksonUtil.valueToTree(entityBody));
|
||||
}
|
||||
}
|
||||
|
||||
private String getScope(Map<String, String> metadata) {
|
||||
String scope = metadata.get(SCOPE);
|
||||
if (StringUtils.isEmpty(scope)) {
|
||||
scope = config.getScope();
|
||||
}
|
||||
return scope;
|
||||
}
|
||||
|
||||
private EdgeEvent buildEdgeEvent(TenantId tenantId, EdgeEventActionType edgeEventAction, UUID entityId, EdgeEventType edgeEventType, JsonNode entityBody) {
|
||||
EdgeEvent edgeEvent = new EdgeEvent();
|
||||
edgeEvent.setTenantId(tenantId);
|
||||
edgeEvent.setAction(edgeEventAction);
|
||||
edgeEvent.setEntityId(entityId);
|
||||
edgeEvent.setType(edgeEventType);
|
||||
edgeEvent.setBody(entityBody);
|
||||
return edgeEvent;
|
||||
}
|
||||
|
||||
private UUID getUUIDFromMsgData(TbMsg msg) {
|
||||
JsonNode data = JacksonUtil.toJsonNode(msg.getData()).get("id");
|
||||
String id = JacksonUtil.convertValue(data.get("id"), String.class);
|
||||
return UUID.fromString(id);
|
||||
}
|
||||
|
||||
private EdgeEventActionType getEdgeEventActionTypeByMsgType(String msgType) {
|
||||
EdgeEventActionType actionType;
|
||||
if (SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)) {
|
||||
actionType = EdgeEventActionType.TIMESERIES_UPDATED;
|
||||
} else if (DataConstants.ATTRIBUTES_UPDATED.equals(msgType)) {
|
||||
actionType = EdgeEventActionType.ATTRIBUTES_UPDATED;
|
||||
} else if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)) {
|
||||
actionType = EdgeEventActionType.POST_ATTRIBUTES;
|
||||
} else {
|
||||
actionType = EdgeEventActionType.ATTRIBUTES_DELETED;
|
||||
}
|
||||
return actionType;
|
||||
}
|
||||
|
||||
private boolean isSupportedOriginator(EntityType entityType) {
|
||||
switch (entityType) {
|
||||
case DEVICE:
|
||||
case ASSET:
|
||||
case ENTITY_VIEW:
|
||||
case DASHBOARD:
|
||||
case TENANT:
|
||||
case CUSTOMER:
|
||||
case EDGE:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isSupportedMsgType(String msgType) {
|
||||
return SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)
|
||||
|| SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)
|
||||
|| DataConstants.ATTRIBUTES_UPDATED.equals(msgType)
|
||||
|| DataConstants.ATTRIBUTES_DELETED.equals(msgType)
|
||||
|| DataConstants.ALARM.equals(msgType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -16,13 +16,12 @@
|
||||
package org.thingsboard.rule.engine.edge;
|
||||
|
||||
import lombok.Data;
|
||||
import org.thingsboard.rule.engine.api.NodeConfiguration;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Data
|
||||
public class TbMsgPushToEdgeNodeConfiguration implements NodeConfiguration<TbMsgPushToEdgeNodeConfiguration> {
|
||||
|
||||
private String scope;
|
||||
public class TbMsgPushToEdgeNodeConfiguration extends BaseTbMsgPushNodeConfiguration {
|
||||
|
||||
@Override
|
||||
public TbMsgPushToEdgeNodeConfiguration defaultConfiguration() {
|
||||
|
||||
@ -0,0 +1,76 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.edge;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgDataType;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
|
||||
import org.thingsboard.server.dao.edge.EdgeService;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class TbMsgPushToEdgeNodeTest {
|
||||
|
||||
TbMsgPushToEdgeNode node;
|
||||
|
||||
private final TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
|
||||
private final DeviceId deviceId = new DeviceId(UUID.randomUUID());
|
||||
|
||||
@Mock
|
||||
private TbContext ctx;
|
||||
|
||||
@Mock
|
||||
private EdgeService edgeService;
|
||||
|
||||
@Before
|
||||
public void setUp() throws TbNodeException {
|
||||
node = new TbMsgPushToEdgeNode();
|
||||
TbMsgPushToEdgeNodeConfiguration config = new TbMsgPushToEdgeNodeConfiguration().defaultConfiguration();
|
||||
node.init(ctx, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void ackMsgInCaseNoEdgeRelated() {
|
||||
Mockito.when(ctx.getTenantId()).thenReturn(tenantId);
|
||||
Mockito.when(ctx.getEdgeService()).thenReturn(edgeService);
|
||||
Mockito.when(edgeService.findRelatedEdgeIdsByEntityId(tenantId, deviceId, new PageLink(TbMsgPushToEdgeNode.DEFAULT_PAGE_SIZE))).thenReturn(new PageData<>());
|
||||
|
||||
TbMsg msg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, new TbMsgMetaData(),
|
||||
TbMsgDataType.JSON, "{}", null, null);
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
verify(ctx).ack(msg);
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user