From fe16336bbbd59b45455104d80c43a3ba21abca99 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 15 Apr 2022 17:33:17 +0300 Subject: [PATCH 1/7] Merge EdgeUtils and EdgeEventUtils --- .../service/edge/rpc/EdgeEventUtils.java | 48 ------------------- .../fetch/AdminSettingsEdgeEventFetcher.java | 10 ++-- .../rpc/fetch/AssetsEdgeEventFetcher.java | 4 +- .../rpc/fetch/BaseUsersEdgeEventFetcher.java | 4 +- .../BaseWidgetsBundlesEdgeEventFetcher.java | 4 +- .../rpc/fetch/CustomerEdgeEventFetcher.java | 4 +- .../rpc/fetch/DashboardsEdgeEventFetcher.java | 4 +- .../fetch/DeviceProfilesEdgeEventFetcher.java | 4 +- .../rpc/fetch/RuleChainsEdgeEventFetcher.java | 4 +- .../rpc/sync/DefaultEdgeRequestsService.java | 3 +- .../server/common/data/EdgeUtils.java | 24 ++++++++++ 11 files changed, 44 insertions(+), 69 deletions(-) delete mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventUtils.java diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventUtils.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventUtils.java deleted file mode 100644 index 0aeac4c16a..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventUtils.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.edge.rpc; - -import com.fasterxml.jackson.databind.JsonNode; -import org.thingsboard.server.common.data.edge.EdgeEvent; -import org.thingsboard.server.common.data.edge.EdgeEventActionType; -import org.thingsboard.server.common.data.edge.EdgeEventType; -import org.thingsboard.server.common.data.id.EdgeId; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.TenantId; - -public final class EdgeEventUtils { - - private EdgeEventUtils() { - } - - public static EdgeEvent constructEdgeEvent(TenantId tenantId, - EdgeId edgeId, - EdgeEventType type, - EdgeEventActionType action, - EntityId entityId, - JsonNode body) { - EdgeEvent edgeEvent = new EdgeEvent(); - edgeEvent.setTenantId(tenantId); - edgeEvent.setEdgeId(edgeId); - edgeEvent.setType(type); - edgeEvent.setAction(action); - if (entityId != null) { - edgeEvent.setEntityId(entityId.getId()); - } - edgeEvent.setBody(body); - return edgeEvent; - } -} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AdminSettingsEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AdminSettingsEdgeEventFetcher.java index 51b84d6a94..0f4ecf7e30 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AdminSettingsEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AdminSettingsEdgeEventFetcher.java @@ -26,6 +26,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.text.WordUtils; import org.thingsboard.server.common.data.AdminSettings; +import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; @@ -35,7 +36,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.settings.AdminSettingsService; -import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; import java.util.ArrayList; import java.util.Arrays; @@ -80,19 +80,19 @@ public class AdminSettingsEdgeEventFetcher implements EdgeEventFetcher { List result = new ArrayList<>(); AdminSettings systemMailSettings = adminSettingsService.findAdminSettingsByKey(TenantId.SYS_TENANT_ID, "mail"); - result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, + result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(systemMailSettings))); AdminSettings tenantMailSettings = convertToTenantAdminSettings(systemMailSettings.getKey(), (ObjectNode) systemMailSettings.getJsonValue()); - result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, + result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(tenantMailSettings))); AdminSettings systemMailTemplates = loadMailTemplates(); - result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, + result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(systemMailTemplates))); AdminSettings tenantMailTemplates = convertToTenantAdminSettings(systemMailTemplates.getKey(), (ObjectNode) systemMailTemplates.getJsonValue()); - result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, + result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(tenantMailTemplates))); // @voba - returns PageData object to be in sync with other fetchers diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AssetsEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AssetsEdgeEventFetcher.java index e66cafda56..ead87638fb 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AssetsEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AssetsEdgeEventFetcher.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc.fetch; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; @@ -26,7 +27,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.asset.AssetService; -import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; @AllArgsConstructor @Slf4j @@ -41,7 +41,7 @@ public class AssetsEdgeEventFetcher extends BasePageableEdgeEventFetcher @Override EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, Asset asset) { - return EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ASSET, + return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ASSET, EdgeEventActionType.ADDED, asset.getId(), null); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseUsersEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseUsersEdgeEventFetcher.java index 606cd6eaca..6791ba69a6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseUsersEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseUsersEdgeEventFetcher.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc.fetch; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; @@ -26,7 +27,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.user.UserService; -import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; @Slf4j @AllArgsConstructor @@ -41,7 +41,7 @@ public abstract class BaseUsersEdgeEventFetcher extends BasePageableEdgeEventFet @Override EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, User user) { - return EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER, + return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER, EdgeEventActionType.ADDED, user.getId(), null); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseWidgetsBundlesEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseWidgetsBundlesEdgeEventFetcher.java index 3f8beb861d..709c438739 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseWidgetsBundlesEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseWidgetsBundlesEdgeEventFetcher.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc.fetch; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; @@ -26,7 +27,6 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.widget.WidgetsBundle; import org.thingsboard.server.dao.widget.WidgetsBundleService; -import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; @Slf4j @AllArgsConstructor @@ -41,7 +41,7 @@ public abstract class BaseWidgetsBundlesEdgeEventFetcher extends BasePageableEdg @Override EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, WidgetsBundle widgetsBundle) { - return EdgeEventUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGETS_BUNDLE, + return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGETS_BUNDLE, EdgeEventActionType.ADDED, widgetsBundle.getId(), null); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CustomerEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CustomerEdgeEventFetcher.java index adaac874a0..1d5c618a0a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CustomerEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CustomerEdgeEventFetcher.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc.fetch; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; @@ -24,7 +25,6 @@ import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; import java.util.ArrayList; import java.util.List; @@ -41,7 +41,7 @@ public class CustomerEdgeEventFetcher implements EdgeEventFetcher { @Override public PageData fetchEdgeEvents(TenantId tenantId, Edge edge, PageLink pageLink) { List result = new ArrayList<>(); - result.add(EdgeEventUtils.constructEdgeEvent(edge.getTenantId(), edge.getId(), + result.add(EdgeUtils.constructEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, edge.getCustomerId(), null)); // @voba - returns PageData object to be in sync with other fetchers return new PageData<>(result, 1, result.size(), false); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/DashboardsEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/DashboardsEdgeEventFetcher.java index f46ab01ee5..a6e7b3307c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/DashboardsEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/DashboardsEdgeEventFetcher.java @@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc.fetch; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.DashboardInfo; +import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; @@ -26,7 +27,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.dashboard.DashboardService; -import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; @AllArgsConstructor @Slf4j @@ -41,7 +41,7 @@ public class DashboardsEdgeEventFetcher extends BasePageableEdgeEventFetcher Date: Mon, 18 Apr 2022 10:52:50 +0300 Subject: [PATCH 2/7] Ack messages in case no edges are related. Refactored push to cloud/edge nodes --- .../engine/edge/AbstractTbMsgPushNode.java | 177 +++++++++++++ .../edge/BaseTbMsgPushNodeConfiguration.java | 33 +++ .../engine/edge/TbMsgPushToCloudNode.java | 39 ++- .../TbMsgPushToCloudNodeConfiguration.java | 7 +- .../rule/engine/edge/TbMsgPushToEdgeNode.java | 237 ++++++------------ .../TbMsgPushToEdgeNodeConfiguration.java | 7 +- .../engine/edge/TbMsgPushToEdgeNodeTest.java | 76 ++++++ 7 files changed, 390 insertions(+), 186 deletions(-) create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/BaseTbMsgPushNodeConfiguration.java create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNodeTest.java diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java new file mode 100644 index 0000000000..7ba94779d1 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java @@ -0,0 +1,177 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.edge; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.session.SessionMsgType; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +@Slf4j +public abstract class AbstractTbMsgPushNode implements TbNode { + + protected T config; + + private static final String SCOPE = "scope"; + + @Override + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, getConfigClazz()); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + if (getIgnoredMessageSource().equalsIgnoreCase(msg.getMetaData().getValue(DataConstants.MSG_SOURCE_KEY))) { + log.debug("Ignoring msg from the {}, msg [{}]", getIgnoredMessageSource(), msg); + ctx.ack(msg); + return; + } + if (isSupportedOriginator(msg.getOriginator().getEntityType())) { + if (isSupportedMsgType(msg.getType())) { + processMsg(ctx, msg); + } else { + String errMsg = String.format("Unsupported msg type %s", msg.getType()); + log.debug(errMsg); + ctx.tellFailure(msg, new RuntimeException(errMsg)); + } + } else { + String errMsg = String.format("Unsupported originator type %s", msg.getOriginator().getEntityType()); + log.debug(errMsg); + ctx.tellFailure(msg, new RuntimeException(errMsg)); + } + } + + @Override + public void destroy() { + } + + protected S buildEvent(TbMsg msg, TbContext ctx) { + String msgType = msg.getType(); + if (DataConstants.ALARM.equals(msgType)) { + return buildEvent(ctx.getTenantId(), EdgeEventActionType.ADDED, getUUIDFromMsgData(msg), getAlarmEventType(), null); + } else { + U eventTypeByEntityType = getEventTypeByEntityType(msg.getOriginator().getEntityType()); + if (eventTypeByEntityType == null) { + return null; + } + EdgeEventActionType actionType = getEdgeEventActionTypeByMsgType(msgType); + Map entityBody = new HashMap<>(); + Map metadata = msg.getMetaData().getData(); + JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData()); + switch (actionType) { + case ATTRIBUTES_UPDATED: + case POST_ATTRIBUTES: + entityBody.put("kv", dataJson); + entityBody.put(SCOPE, getScope(metadata)); + if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)) { + entityBody.put("isPostAttributes", true); + } + break; + case ATTRIBUTES_DELETED: + List keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() {}); + entityBody.put("keys", keys); + entityBody.put(SCOPE, getScope(metadata)); + break; + case TIMESERIES_UPDATED: + entityBody.put("data", dataJson); + entityBody.put("ts", metadata.get("ts")); + break; + } + return buildEvent(ctx.getTenantId(), actionType, msg.getOriginator().getId(), eventTypeByEntityType, JacksonUtil.valueToTree(entityBody)); + } + } + + abstract S buildEvent(TenantId tenantId, EdgeEventActionType edgeEventAction, UUID entityId, U edgeEventType, JsonNode entityBody); + + abstract U getEventTypeByEntityType(EntityType entityType); + + abstract U getAlarmEventType(); + + abstract String getIgnoredMessageSource(); + + abstract protected Class getConfigClazz(); + + abstract void processMsg(TbContext ctx, TbMsg msg); + + protected UUID getUUIDFromMsgData(TbMsg msg) { + JsonNode data = JacksonUtil.toJsonNode(msg.getData()).get("id"); + String id = JacksonUtil.convertValue(data.get("id"), String.class); + return UUID.fromString(id); + } + + protected String getScope(Map metadata) { + String scope = metadata.get(SCOPE); + if (StringUtils.isEmpty(scope)) { + scope = config.getScope(); + } + return scope; + } + + protected EdgeEventActionType getEdgeEventActionTypeByMsgType(String msgType) { + EdgeEventActionType actionType; + if (SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)) { + actionType = EdgeEventActionType.TIMESERIES_UPDATED; + } else if (DataConstants.ATTRIBUTES_UPDATED.equals(msgType)) { + actionType = EdgeEventActionType.ATTRIBUTES_UPDATED; + } else if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)) { + actionType = EdgeEventActionType.POST_ATTRIBUTES; + } else { + actionType = EdgeEventActionType.ATTRIBUTES_DELETED; + } + return actionType; + } + + protected boolean isSupportedMsgType(String msgType) { + return SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType) + || SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType) + || DataConstants.ATTRIBUTES_UPDATED.equals(msgType) + || DataConstants.ATTRIBUTES_DELETED.equals(msgType) + || DataConstants.TIMESERIES_UPDATED.equals(msgType) + || DataConstants.ALARM.equals(msgType); + } + + protected boolean isSupportedOriginator(EntityType entityType) { + switch (entityType) { + case DEVICE: + case ASSET: + case ENTITY_VIEW: + case DASHBOARD: + case TENANT: + case CUSTOMER: + case EDGE: + return true; + default: + return false; + } + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/BaseTbMsgPushNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/BaseTbMsgPushNodeConfiguration.java new file mode 100644 index 0000000000..84ad05f178 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/BaseTbMsgPushNodeConfiguration.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.edge; + +import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; +import org.thingsboard.server.common.data.DataConstants; + +@Data +public class BaseTbMsgPushNodeConfiguration implements NodeConfiguration { + + private String scope; + + @Override + public BaseTbMsgPushNodeConfiguration defaultConfiguration() { + BaseTbMsgPushNodeConfiguration configuration = new BaseTbMsgPushNodeConfiguration(); + configuration.setScope(DataConstants.SERVER_SCOPE); + return configuration; + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToCloudNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToCloudNode.java index 013f9a3fc4..e8524484a3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToCloudNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToCloudNode.java @@ -15,17 +15,19 @@ */ package org.thingsboard.rule.engine.edge; +import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNode; -import org.thingsboard.rule.engine.api.TbNodeConfiguration; -import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.common.msg.TbMsg; +import java.util.UUID; + @Slf4j @RuleNode( type = ComponentType.ACTION, @@ -57,22 +59,37 @@ import org.thingsboard.server.common.msg.TbMsg; icon = "cloud_upload", ruleChainTypes = RuleChainType.EDGE ) -public class TbMsgPushToCloudNode implements TbNode { +public class TbMsgPushToCloudNode extends AbstractTbMsgPushNode { - private TbMsgPushToCloudNodeConfiguration config; + // Implementation of this node is done on the Edge @Override - public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { - this.config = TbNodeUtils.convert(configuration, TbMsgPushToCloudNodeConfiguration.class); + Object buildEvent(TenantId tenantId, EdgeEventActionType edgeEventAction, UUID entityId, Object edgeEventType, JsonNode entityBody) { + return null; } @Override - public void onMsg(TbContext ctx, TbMsg msg) { - // Implementation of this node is done on the Edge + Object getEventTypeByEntityType(EntityType entityType) { + return null; } @Override - public void destroy() { + Object getAlarmEventType() { + return null; + } + + @Override + String getIgnoredMessageSource() { + return null; + } + + @Override + protected Class getConfigClazz() { + return TbMsgPushToCloudNodeConfiguration.class; + } + + @Override + void processMsg(TbContext ctx, TbMsg msg) { } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToCloudNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToCloudNodeConfiguration.java index 6dcbf23a32..2c9e76fe61 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToCloudNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToCloudNodeConfiguration.java @@ -16,13 +16,12 @@ package org.thingsboard.rule.engine.edge; import lombok.Data; -import org.thingsboard.rule.engine.api.NodeConfiguration; +import lombok.EqualsAndHashCode; import org.thingsboard.server.common.data.DataConstants; +@EqualsAndHashCode(callSuper = true) @Data -public class TbMsgPushToCloudNodeConfiguration implements NodeConfiguration { - - private String scope; +public class TbMsgPushToCloudNodeConfiguration extends BaseTbMsgPushNodeConfiguration { @Override public TbMsgPushToCloudNodeConfiguration defaultConfiguration() { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java index f8bd44bb61..15843d3f56 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java @@ -15,23 +15,13 @@ */ package org.thingsboard.rule.engine.edge; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNode; -import org.thingsboard.rule.engine.api.TbNodeConfiguration; -import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; @@ -42,12 +32,7 @@ import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.session.SessionMsgType; -import javax.annotation.Nullable; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.UUID; import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; @@ -84,119 +69,13 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; icon = "cloud_download", ruleChainTypes = RuleChainType.CORE ) -public class TbMsgPushToEdgeNode implements TbNode { +public class TbMsgPushToEdgeNode extends AbstractTbMsgPushNode { - private TbMsgPushToEdgeNodeConfiguration config; - - private static final String SCOPE = "scope"; - - private static final int DEFAULT_PAGE_SIZE = 1000; + static final int DEFAULT_PAGE_SIZE = 100; @Override - public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { - this.config = TbNodeUtils.convert(configuration, TbMsgPushToEdgeNodeConfiguration.class); - } - - @Override - public void onMsg(TbContext ctx, TbMsg msg) { - if (DataConstants.EDGE_MSG_SOURCE.equalsIgnoreCase(msg.getMetaData().getValue(DataConstants.MSG_SOURCE_KEY))) { - log.debug("Ignoring msg from the cloud, msg [{}]", msg); - ctx.ack(msg); - return; - } - if (isSupportedOriginator(msg.getOriginator().getEntityType())) { - if (isSupportedMsgType(msg.getType())) { - processMsg(ctx, msg); - } else { - log.debug("Unsupported msg type {}", msg.getType()); - ctx.tellFailure(msg, new RuntimeException("Unsupported msg type '" + msg.getType() + "'")); - } - } else { - log.debug("Unsupported originator type {}", msg.getOriginator().getEntityType()); - ctx.tellFailure(msg, new RuntimeException("Unsupported originator type '" + msg.getOriginator().getEntityType() + "'")); - } - } - - private void processMsg(TbContext ctx, TbMsg msg) { - if (EntityType.EDGE.equals(msg.getOriginator().getEntityType())) { - EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx); - if (edgeEvent != null) { - EdgeId edgeId = new EdgeId(msg.getOriginator().getId()); - notifyEdge(ctx, msg, edgeEvent, edgeId); - } - } else { - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; - do { - pageData = ctx.getEdgeService().findRelatedEdgeIdsByEntityId(ctx.getTenantId(), msg.getOriginator(), pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - for (EdgeId edgeId : pageData.getData()) { - EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx); - if (edgeEvent == null) { - log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType()); - ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'")); - } else { - notifyEdge(ctx, msg, edgeEvent, edgeId); - } - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } - } - } while (pageData != null && pageData.hasNext()); - } - } - - private void notifyEdge(TbContext ctx, TbMsg msg, EdgeEvent edgeEvent, EdgeId edgeId) { - edgeEvent.setEdgeId(edgeId); - ctx.getEdgeEventService().save(edgeEvent); - ctx.tellNext(msg, SUCCESS); - ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId); - } - - private EdgeEvent buildEdgeEvent(TbMsg msg, TbContext ctx) { - String msgType = msg.getType(); - if (DataConstants.ALARM.equals(msgType)) { - return buildEdgeEvent(ctx.getTenantId(), EdgeEventActionType.ADDED, getUUIDFromMsgData(msg), EdgeEventType.ALARM, null); - } else { - EdgeEventType edgeEventTypeByEntityType = EdgeUtils.getEdgeEventTypeByEntityType(msg.getOriginator().getEntityType()); - if (edgeEventTypeByEntityType == null) { - return null; - } - EdgeEventActionType actionType = getEdgeEventActionTypeByMsgType(msgType); - Map entityBody = new HashMap<>(); - Map metadata = msg.getMetaData().getData(); - JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData()); - switch (actionType) { - case ATTRIBUTES_UPDATED: - case POST_ATTRIBUTES: - entityBody.put("kv", dataJson); - entityBody.put(SCOPE, getScope(metadata)); - break; - case ATTRIBUTES_DELETED: - List keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() { - }); - entityBody.put("keys", keys); - entityBody.put(SCOPE, getScope(metadata)); - break; - case TIMESERIES_UPDATED: - entityBody.put("data", dataJson); - entityBody.put("ts", metadata.get("ts")); - break; - } - return buildEdgeEvent(ctx.getTenantId(), actionType, msg.getOriginator().getId(), edgeEventTypeByEntityType, JacksonUtil.valueToTree(entityBody)); - } - } - - private String getScope(Map metadata) { - String scope = metadata.get(SCOPE); - if (StringUtils.isEmpty(scope)) { - scope = config.getScope(); - } - return scope; - } - - private EdgeEvent buildEdgeEvent(TenantId tenantId, EdgeEventActionType edgeEventAction, UUID entityId, EdgeEventType edgeEventType, JsonNode entityBody) { + EdgeEvent buildEvent(TenantId tenantId, EdgeEventActionType edgeEventAction, UUID entityId, + EdgeEventType edgeEventType, JsonNode entityBody) { EdgeEvent edgeEvent = new EdgeEvent(); edgeEvent.setTenantId(tenantId); edgeEvent.setAction(edgeEventAction); @@ -206,51 +85,75 @@ public class TbMsgPushToEdgeNode implements TbNode { return edgeEvent; } - private UUID getUUIDFromMsgData(TbMsg msg) { - JsonNode data = JacksonUtil.toJsonNode(msg.getData()).get("id"); - String id = JacksonUtil.convertValue(data.get("id"), String.class); - return UUID.fromString(id); - } - - private EdgeEventActionType getEdgeEventActionTypeByMsgType(String msgType) { - EdgeEventActionType actionType; - if (SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)) { - actionType = EdgeEventActionType.TIMESERIES_UPDATED; - } else if (DataConstants.ATTRIBUTES_UPDATED.equals(msgType)) { - actionType = EdgeEventActionType.ATTRIBUTES_UPDATED; - } else if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)) { - actionType = EdgeEventActionType.POST_ATTRIBUTES; - } else { - actionType = EdgeEventActionType.ATTRIBUTES_DELETED; - } - return actionType; - } - - private boolean isSupportedOriginator(EntityType entityType) { - switch (entityType) { - case DEVICE: - case ASSET: - case ENTITY_VIEW: - case DASHBOARD: - case TENANT: - case CUSTOMER: - case EDGE: - return true; - default: - return false; - } - } - - private boolean isSupportedMsgType(String msgType) { - return SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType) - || SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType) - || DataConstants.ATTRIBUTES_UPDATED.equals(msgType) - || DataConstants.ATTRIBUTES_DELETED.equals(msgType) - || DataConstants.ALARM.equals(msgType); + @Override + EdgeEventType getEventTypeByEntityType(EntityType entityType) { + return EdgeUtils.getEdgeEventTypeByEntityType(entityType); } @Override - public void destroy() { + EdgeEventType getAlarmEventType() { + return EdgeEventType.ALARM; + } + + @Override + String getIgnoredMessageSource() { + return DataConstants.EDGE_MSG_SOURCE; + } + + @Override + protected Class getConfigClazz() { + return TbMsgPushToEdgeNodeConfiguration.class; + } + + protected void processMsg(TbContext ctx, TbMsg msg) { + if (EntityType.EDGE.equals(msg.getOriginator().getEntityType())) { + EdgeEvent edgeEvent = buildEvent(msg, ctx); + if (edgeEvent != null) { + EdgeId edgeId = new EdgeId(msg.getOriginator().getId()); + notifyEdge(ctx, msg, edgeEvent, edgeId); + } else { + tellFailure(ctx, msg); + } + } else { + PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); + PageData pageData; + boolean edgeNotified = false; + do { + pageData = ctx.getEdgeService().findRelatedEdgeIdsByEntityId(ctx.getTenantId(), msg.getOriginator(), pageLink); + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { + for (EdgeId edgeId : pageData.getData()) { + EdgeEvent edgeEvent = buildEvent(msg, ctx); + if (edgeEvent != null) { + notifyEdge(ctx, msg, edgeEvent, edgeId); + edgeNotified = true; + } else { + tellFailure(ctx, msg); + } + } + if (pageData.hasNext()) { + pageLink = pageLink.nextPageLink(); + } + } + } while (pageData != null && pageData.hasNext()); + + if (!edgeNotified) { + // ack in case no edges are related to provided entity + ctx.ack(msg); + } + } + } + + private void tellFailure(TbContext ctx, TbMsg msg) { + String errMsg = String.format("Edge event type is null. Entity Type %s", msg.getOriginator().getEntityType()); + log.warn(errMsg); + ctx.tellFailure(msg, new RuntimeException(errMsg)); + } + + private void notifyEdge(TbContext ctx, TbMsg msg, EdgeEvent edgeEvent, EdgeId edgeId) { + edgeEvent.setEdgeId(edgeId); + ctx.getEdgeEventService().save(edgeEvent); + ctx.tellNext(msg, SUCCESS); + ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNodeConfiguration.java index 05015e1a48..6050fc0e90 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNodeConfiguration.java @@ -16,13 +16,12 @@ package org.thingsboard.rule.engine.edge; import lombok.Data; -import org.thingsboard.rule.engine.api.NodeConfiguration; +import lombok.EqualsAndHashCode; import org.thingsboard.server.common.data.DataConstants; +@EqualsAndHashCode(callSuper = true) @Data -public class TbMsgPushToEdgeNodeConfiguration implements NodeConfiguration { - - private String scope; +public class TbMsgPushToEdgeNodeConfiguration extends BaseTbMsgPushNodeConfiguration { @Override public TbMsgPushToEdgeNodeConfiguration defaultConfiguration() { diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNodeTest.java new file mode 100644 index 0000000000..b7dbda1780 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNodeTest.java @@ -0,0 +1,76 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.edge; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgDataType; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.session.SessionMsgType; +import org.thingsboard.server.dao.edge.EdgeService; + +import java.util.UUID; + +import static org.mockito.Mockito.verify; + +@RunWith(MockitoJUnitRunner.class) +public class TbMsgPushToEdgeNodeTest { + + TbMsgPushToEdgeNode node; + + private final TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); + private final DeviceId deviceId = new DeviceId(UUID.randomUUID()); + + @Mock + private TbContext ctx; + + @Mock + private EdgeService edgeService; + + @Before + public void setUp() throws TbNodeException { + node = new TbMsgPushToEdgeNode(); + TbMsgPushToEdgeNodeConfiguration config = new TbMsgPushToEdgeNodeConfiguration().defaultConfiguration(); + node.init(ctx, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + } + + @Test + public void ackMsgInCaseNoEdgeRelated() { + Mockito.when(ctx.getTenantId()).thenReturn(tenantId); + Mockito.when(ctx.getEdgeService()).thenReturn(edgeService); + Mockito.when(edgeService.findRelatedEdgeIdsByEntityId(tenantId, deviceId, new PageLink(TbMsgPushToEdgeNode.DEFAULT_PAGE_SIZE))).thenReturn(new PageData<>()); + + TbMsg msg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, new TbMsgMetaData(), + TbMsgDataType.JSON, "{}", null, null); + + node.onMsg(ctx, msg); + + verify(ctx).ack(msg); + } +} From 978861e56c1557e78f3e61ae026a9f01691db717 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 18 Apr 2022 11:35:07 +0300 Subject: [PATCH 3/7] Minor refactoring. Added missing timeseries_updated message type --- .../rule/engine/edge/AbstractTbMsgPushNode.java | 5 +++-- .../rule/engine/edge/TbMsgPushToCloudNode.java | 2 +- .../rule/engine/edge/TbMsgPushToEdgeNode.java | 9 +++++---- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java index 7ba94779d1..f8865bd4fd 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java @@ -111,7 +111,7 @@ public abstract class AbstractTbMsgPushNode Date: Mon, 18 Apr 2022 18:02:01 +0300 Subject: [PATCH 4/7] Removed objectmapper from edge service/session --- .../thingsboard/server/service/edge/rpc/EdgeGrpcService.java | 4 +--- .../thingsboard/server/service/edge/rpc/EdgeGrpcSession.java | 5 +---- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 65ca6b4ea6..c56fcd4ed9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.edge.rpc; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import io.grpc.Server; @@ -71,7 +70,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private final ConcurrentMap sessionNewEventsLocks = new ConcurrentHashMap<>(); private final Map sessionNewEvents = new HashMap<>(); private final ConcurrentMap> sessionEdgeEventChecks = new ConcurrentHashMap<>(); - private static final ObjectMapper mapper = new ObjectMapper(); @Value("${edges.rpc.port}") private int rpcPort; @@ -159,7 +157,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i @Override public StreamObserver handleMsgs(StreamObserver outputStream) { - return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper, sendDownlinkExecutorService).getInputStream(); + return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, sendDownlinkExecutorService).getInputStream(); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index e4f32f6a89..2ce1b73520 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -16,7 +16,6 @@ package org.thingsboard.server.service.edge.rpc; import com.datastax.oss.driver.api.core.uuid.Uuids; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -96,7 +95,6 @@ public final class EdgeGrpcSession implements Closeable { private final UUID sessionId; private final BiConsumer sessionOpenListener; private final Consumer sessionCloseListener; - private final ObjectMapper mapper; private final EdgeSessionState sessionState = new EdgeSessionState(); @@ -112,13 +110,12 @@ public final class EdgeGrpcSession implements Closeable { private ScheduledExecutorService sendDownlinkExecutorService; EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver outputStream, BiConsumer sessionOpenListener, - Consumer sessionCloseListener, ObjectMapper mapper, ScheduledExecutorService sendDownlinkExecutorService) { + Consumer sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService) { this.sessionId = UUID.randomUUID(); this.ctx = ctx; this.outputStream = outputStream; this.sessionOpenListener = sessionOpenListener; this.sessionCloseListener = sessionCloseListener; - this.mapper = mapper; this.sendDownlinkExecutorService = sendDownlinkExecutorService; initInputStream(); } From 5b4e92974d8246e783c3839daca6f32395809779 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 19 Apr 2022 10:21:26 +0300 Subject: [PATCH 5/7] Use getMetadataTs method instead of direct fetch from metadata --- .../thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java index f8865bd4fd..93257ac48c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java @@ -93,7 +93,7 @@ public abstract class AbstractTbMsgPushNode Date: Tue, 19 Apr 2022 16:29:58 +0300 Subject: [PATCH 6/7] Fix concurrent modification issue during warn logging --- .../server/service/edge/rpc/EdgeGrpcSession.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 2ce1b73520..e0e1ccad5b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -399,11 +399,15 @@ public final class EdgeGrpcSession implements Closeable { Runnable sendDownlinkMsgsTask = () -> { try { if (isConnected() && sessionState.getPendingMsgsMap().values().size() > 0) { + List copy = null; if (!firstRun) { - log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, sessionState.getPendingMsgsMap().values()); + copy = new ArrayList<>(sessionState.getPendingMsgsMap().values()); + log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, copy); } - log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, sessionState.getPendingMsgsMap().values().size()); - List copy = new ArrayList<>(sessionState.getPendingMsgsMap().values()); + if (copy == null) { + copy = new ArrayList<>(sessionState.getPendingMsgsMap().values()); + } + log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, copy.size()); for (DownlinkMsg downlinkMsg : copy) { sendDownlinkMsg(ResponseMsg.newBuilder() .setDownlinkMsg(downlinkMsg) From f5b22510af0e082dd8c2e8112e6acc61afd6f657 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 20 Apr 2022 12:57:04 +0300 Subject: [PATCH 7/7] Reduced code complexity --- .../server/service/edge/rpc/EdgeGrpcSession.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index e0e1ccad5b..de0cea14d6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -399,14 +399,10 @@ public final class EdgeGrpcSession implements Closeable { Runnable sendDownlinkMsgsTask = () -> { try { if (isConnected() && sessionState.getPendingMsgsMap().values().size() > 0) { - List copy = null; + List copy = new ArrayList<>(sessionState.getPendingMsgsMap().values()); if (!firstRun) { - copy = new ArrayList<>(sessionState.getPendingMsgsMap().values()); log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, copy); } - if (copy == null) { - copy = new ArrayList<>(sessionState.getPendingMsgsMap().values()); - } log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, copy.size()); for (DownlinkMsg downlinkMsg : copy) { sendDownlinkMsg(ResponseMsg.newBuilder()