diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index fb1e17493b..d6cac66759 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -77,7 +77,7 @@ public class EdgeEventSourcingListener { log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event); EdgeEventActionType action = Boolean.TRUE.equals(event.getAdded()) ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED; tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), - null, null, action, edgeSynchronizationManager.getEdgeId()); + null, null, action, edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event); } @@ -89,7 +89,7 @@ public class EdgeEventSourcingListener { log.trace("[{}] DeleteEntityEvent called: {}", event.getTenantId(), event); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED, - edgeSynchronizationManager.getEdgeId()); + edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event); } @@ -101,7 +101,7 @@ public class EdgeEventSourcingListener { log.trace("[{}] ActionEntityEvent called: {}", event.getTenantId(), event); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(), event.getBody(), null, edgeTypeByActionType(event.getActionType()), - edgeSynchronizationManager.getEdgeId()); + edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event); } @@ -122,7 +122,7 @@ public class EdgeEventSourcingListener { log.trace("[{}] RelationActionEvent called: {}", event.getTenantId(), event); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, null, JacksonUtil.toString(relation), EdgeEventType.RELATION, edgeTypeByActionType(event.getActionType()), - edgeSynchronizationManager.getEdgeId()); + edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/AbstractRuleChainMetadataConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/AbstractRuleChainMetadataConstructor.java index f33f0f84e8..a720d0aab6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/AbstractRuleChainMetadataConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/AbstractRuleChainMetadataConstructor.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.edge.rpc.constructor.rule; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -50,7 +49,7 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM constructRuleChainMetadataUpdatedMsg(tenantId, builder, ruleChainMetaData); builder.setMsgType(msgType); return builder.build(); - } catch (JsonProcessingException ex) { + } catch (Exception ex) { log.error("[{}] Can't construct RuleChainMetadataUpdateMsg", tenantId, ex); } return null; @@ -58,7 +57,7 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM protected abstract void constructRuleChainMetadataUpdatedMsg(TenantId tenantId, RuleChainMetadataUpdateMsg.Builder builder, - RuleChainMetaData ruleChainMetaData) throws JsonProcessingException; + RuleChainMetaData ruleChainMetaData); protected List constructConnections(List connections) { List result = new ArrayList<>(); @@ -78,7 +77,7 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM .build(); } - protected List constructNodes(List nodes) throws JsonProcessingException { + protected List constructNodes(List nodes) { List result = new ArrayList<>(); if (nodes != null && !nodes.isEmpty()) { for (RuleNode node : nodes) { @@ -88,22 +87,22 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM return result; } - private RuleNodeProto constructNode(RuleNode node) throws JsonProcessingException { + private RuleNodeProto constructNode(RuleNode node) { return RuleNodeProto.newBuilder() .setIdMSB(node.getId().getId().getMostSignificantBits()) .setIdLSB(node.getId().getId().getLeastSignificantBits()) .setType(node.getType()) .setName(node.getName()) .setDebugMode(node.isDebugMode()) - .setConfiguration(JacksonUtil.OBJECT_MAPPER.writeValueAsString(node.getConfiguration())) - .setAdditionalInfo(JacksonUtil.OBJECT_MAPPER.writeValueAsString(node.getAdditionalInfo())) + .setConfiguration(JacksonUtil.toString(node.getConfiguration())) + .setAdditionalInfo(JacksonUtil.toString(node.getAdditionalInfo())) .setSingletonMode(node.isSingletonMode()) .setConfigurationVersion(node.getConfigurationVersion()) .build(); } protected List constructRuleChainConnections(List ruleChainConnections, - NavigableSet removedNodeIndexes) throws JsonProcessingException { + NavigableSet removedNodeIndexes) { List result = new ArrayList<>(); if (ruleChainConnections != null && !ruleChainConnections.isEmpty()) { for (RuleChainConnectionInfo ruleChainConnectionInfo : ruleChainConnections) { @@ -127,13 +126,13 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM return result; } - private RuleChainConnectionInfoProto constructRuleChainConnection(RuleChainConnectionInfo ruleChainConnectionInfo) throws JsonProcessingException { + private RuleChainConnectionInfoProto constructRuleChainConnection(RuleChainConnectionInfo ruleChainConnectionInfo) { return RuleChainConnectionInfoProto.newBuilder() .setFromIndex(ruleChainConnectionInfo.getFromIndex()) .setTargetRuleChainIdMSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getMostSignificantBits()) .setTargetRuleChainIdLSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getLeastSignificantBits()) .setType(ruleChainConnectionInfo.getType()) - .setAdditionalInfo(JacksonUtil.OBJECT_MAPPER.writeValueAsString(ruleChainConnectionInfo.getAdditionalInfo())) + .setAdditionalInfo(JacksonUtil.toString(ruleChainConnectionInfo.getAdditionalInfo())) .build(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/RuleChainMetadataConstructorV330.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/RuleChainMetadataConstructorV330.java index 5e1310a5a5..33fc39b7e7 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/RuleChainMetadataConstructorV330.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/RuleChainMetadataConstructorV330.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.edge.rpc.constructor.rule; -import com.fasterxml.jackson.core.JsonProcessingException; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.flow.TbRuleChainInputNode; @@ -45,7 +44,7 @@ public class RuleChainMetadataConstructorV330 extends AbstractRuleChainMetadataC @Override protected void constructRuleChainMetadataUpdatedMsg(TenantId tenantId, RuleChainMetadataUpdateMsg.Builder builder, - RuleChainMetaData ruleChainMetaData) throws JsonProcessingException { + RuleChainMetaData ruleChainMetaData) { List supportedNodes = filterNodes(ruleChainMetaData.getNodes()); NavigableSet removedNodeIndexes = getRemovedNodeIndexes(ruleChainMetaData.getNodes(), ruleChainMetaData.getConnections()); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/RuleChainMetadataConstructorV340.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/RuleChainMetadataConstructorV340.java index 6fe39942ad..9a37e416db 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/RuleChainMetadataConstructorV340.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/RuleChainMetadataConstructorV340.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.edge.rpc.constructor.rule; -import com.fasterxml.jackson.core.JsonProcessingException; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.rule.RuleChainMetaData; @@ -29,7 +28,7 @@ public class RuleChainMetadataConstructorV340 extends AbstractRuleChainMetadataC @Override protected void constructRuleChainMetadataUpdatedMsg(TenantId tenantId, RuleChainMetadataUpdateMsg.Builder builder, - RuleChainMetaData ruleChainMetaData) throws JsonProcessingException { + RuleChainMetaData ruleChainMetaData) { builder.addAllNodes(constructNodes(ruleChainMetaData.getNodes())) .addAllConnections(constructConnections(ruleChainMetaData.getConnections())) .addAllRuleChainConnections(constructRuleChainConnections(ruleChainMetaData.getRuleChainConnections(), new TreeSet<>())); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AdminSettingsEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AdminSettingsEdgeEventFetcher.java index b9fc1c8c9e..e3a8c14012 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AdminSettingsEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AdminSettingsEdgeEventFetcher.java @@ -79,19 +79,19 @@ public class AdminSettingsEdgeEventFetcher implements EdgeEventFetcher { AdminSettings systemMailSettings = adminSettingsService.findAdminSettingsByKey(TenantId.SYS_TENANT_ID, "mail"); result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, - EdgeEventActionType.UPDATED, null, JacksonUtil.OBJECT_MAPPER.valueToTree(systemMailSettings))); + EdgeEventActionType.UPDATED, null, JacksonUtil.valueToTree(systemMailSettings))); AdminSettings tenantMailSettings = convertToTenantAdminSettings(tenantId, systemMailSettings.getKey(), (ObjectNode) systemMailSettings.getJsonValue()); result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, - EdgeEventActionType.UPDATED, null, JacksonUtil.OBJECT_MAPPER.valueToTree(tenantMailSettings))); + EdgeEventActionType.UPDATED, null, JacksonUtil.valueToTree(tenantMailSettings))); AdminSettings systemMailTemplates = loadMailTemplates(tenantId); result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, - EdgeEventActionType.UPDATED, null, JacksonUtil.OBJECT_MAPPER.valueToTree(systemMailTemplates))); + EdgeEventActionType.UPDATED, null, JacksonUtil.valueToTree(systemMailTemplates))); AdminSettings tenantMailTemplates = convertToTenantAdminSettings(tenantId, systemMailTemplates.getKey(), (ObjectNode) systemMailTemplates.getJsonValue()); result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, - EdgeEventActionType.UPDATED, null, JacksonUtil.OBJECT_MAPPER.valueToTree(tenantMailTemplates))); + EdgeEventActionType.UPDATED, null, JacksonUtil.valueToTree(tenantMailTemplates))); // return PageData object to be in sync with other fetchers return new PageData<>(result, 1, result.size(), false); @@ -114,7 +114,7 @@ public class AdminSettingsEdgeEventFetcher implements EdgeEventFetcher { AdminSettings adminSettings = new AdminSettings(); adminSettings.setId(new AdminSettingsId(Uuids.timeBased())); adminSettings.setKey("mailTemplates"); - adminSettings.setJsonValue(JacksonUtil.OBJECT_MAPPER.convertValue(mailTemplates, JsonNode.class)); + adminSettings.setJsonValue(JacksonUtil.convertValue(mailTemplates, JsonNode.class)); return adminSettings; } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 9b964ed922..216278b9f9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -431,6 +431,8 @@ public abstract class BaseEdgeProcessor { return Futures.immediateFuture(null); } }, dbCallbackExecutorService); + } else { + return Futures.immediateFuture(null); } default: return Futures.immediateFuture(null); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java index 9e8871e5e6..b76e8f187d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java @@ -50,10 +50,10 @@ public class AlarmEdgeProcessor extends BaseAlarmProcessor { public ListenableFuture processAlarmMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmUpdateMsg alarmUpdateMsg) { log.trace("[{}] processAlarmMsgFromEdge [{}]", tenantId, alarmUpdateMsg); try { - edgeSynchronizationManager.getSync().set(edgeId); + edgeSynchronizationManager.getEdgeId().set(edgeId); return processAlarmMsg(tenantId, alarmUpdateMsg); } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } } @@ -77,7 +77,7 @@ public class AlarmEdgeProcessor extends BaseAlarmProcessor { case DELETED: Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), Alarm.class); List> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(), - alarmId, actionType, JacksonUtil.OBJECT_MAPPER.valueToTree(deletedAlarm), sourceEdgeId); + alarmId, actionType, JacksonUtil.valueToTree(deletedAlarm), sourceEdgeId); return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); default: ListenableFuture alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java index 7dbff33d1d..03a91ae2f0 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java @@ -123,7 +123,7 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { } break; case DELETED: - Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.convertValue(body, Alarm.class); + Alarm deletedAlarm = JacksonUtil.convertValue(body, Alarm.class); return alarmMsgConstructor.constructAlarmUpdatedMsg(msgType, deletedAlarm, findOriginatorEntityName(tenantId, deletedAlarm)); } return null; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java index e3570962f2..caa03af8a5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java @@ -50,10 +50,10 @@ import java.util.UUID; public class AssetEdgeProcessor extends BaseAssetProcessor { public ListenableFuture processAssetMsgFromEdge(TenantId tenantId, Edge edge, AssetUpdateMsg assetUpdateMsg) { - log.trace("[{}] executing processAssetMsgFromEdge [{}] from edge [{}]", tenantId, assetUpdateMsg, edge.getName()); + log.trace("[{}] executing processAssetMsgFromEdge [{}] from edge [{}]", tenantId, assetUpdateMsg, edge.getId()); AssetId assetId = new AssetId(new UUID(assetUpdateMsg.getIdMSB(), assetUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(edge.getId()); + edgeSynchronizationManager.getEdgeId().set(edge.getId()); switch (assetUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: @@ -78,7 +78,7 @@ public class AssetEdgeProcessor extends BaseAssetProcessor { return Futures.immediateFailedFuture(e); } } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java index 83d23f50d7..fb55edd8c5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java @@ -49,10 +49,10 @@ import java.util.UUID; public class AssetProfileEdgeProcessor extends BaseAssetProfileProcessor { public ListenableFuture processAssetProfileMsgFromEdge(TenantId tenantId, Edge edge, AssetProfileUpdateMsg assetProfileUpdateMsg) { - log.trace("[{}] executing processAssetProfileMsgFromEdge [{}] from edge [{}]", tenantId, assetProfileUpdateMsg, edge.getName()); + log.trace("[{}] executing processAssetProfileMsgFromEdge [{}] from edge [{}]", tenantId, assetProfileUpdateMsg, edge.getId()); AssetProfileId assetProfileId = new AssetProfileId(new UUID(assetProfileUpdateMsg.getIdMSB(), assetProfileUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(edge.getId()); + edgeSynchronizationManager.getEdgeId().set(edge.getId()); switch (assetProfileUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: @@ -68,7 +68,7 @@ public class AssetProfileEdgeProcessor extends BaseAssetProfileProcessor { log.warn("[{}] Failed to process AssetProfileUpdateMsg from Edge [{}]", tenantId, assetProfileUpdateMsg, e); return Futures.immediateFailedFuture(e); } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java index 69dc3c785d..291491a2fe 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java @@ -45,10 +45,10 @@ import java.util.UUID; public class DashboardEdgeProcessor extends BaseDashboardProcessor { public ListenableFuture processDashboardMsgFromEdge(TenantId tenantId, Edge edge, DashboardUpdateMsg dashboardUpdateMsg) { - log.trace("[{}] executing processDashboardMsgFromEdge [{}] from edge [{}]", tenantId, dashboardUpdateMsg, edge.getName()); + log.trace("[{}] executing processDashboardMsgFromEdge [{}] from edge [{}]", tenantId, dashboardUpdateMsg, edge.getId()); DashboardId dashboardId = new DashboardId(new UUID(dashboardUpdateMsg.getIdMSB(), dashboardUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(edge.getId()); + edgeSynchronizationManager.getEdgeId().set(edge.getId()); switch (dashboardUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: @@ -73,7 +73,7 @@ public class DashboardEdgeProcessor extends BaseDashboardProcessor { return Futures.immediateFailedFuture(e); } } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java index 97f37ba833..ad152f754b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java @@ -66,10 +66,10 @@ import java.util.UUID; public class DeviceEdgeProcessor extends BaseDeviceProcessor { public ListenableFuture processDeviceMsgFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) { - log.trace("[{}] executing processDeviceMsgFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); + log.trace("[{}] executing processDeviceMsgFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getId()); DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(edge.getId()); + edgeSynchronizationManager.getEdgeId().set(edge.getId()); switch (deviceUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: @@ -94,18 +94,18 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor { return Futures.immediateFailedFuture(e); } } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } } public ListenableFuture processDeviceCredentialsMsgFromEdge(TenantId tenantId, EdgeId edgeId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) { log.debug("[{}] Executing processDeviceCredentialsMsgFromEdge, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg); try { - edgeSynchronizationManager.getSync().set(edgeId); + edgeSynchronizationManager.getEdgeId().set(edgeId); updateDeviceCredentials(tenantId, deviceCredentialsUpdateMsg); } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } return Futures.immediateFuture(null); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java index 4267adb8e2..3ed75bdb41 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java @@ -49,10 +49,10 @@ import java.util.UUID; public class DeviceProfileEdgeProcessor extends BaseDeviceProfileProcessor { public ListenableFuture processDeviceProfileMsgFromEdge(TenantId tenantId, Edge edge, DeviceProfileUpdateMsg deviceProfileUpdateMsg) { - log.trace("[{}] executing processDeviceProfileMsgFromEdge [{}] from edge [{}]", tenantId, deviceProfileUpdateMsg, edge.getName()); + log.trace("[{}] executing processDeviceProfileMsgFromEdge [{}] from edge [{}]", tenantId, deviceProfileUpdateMsg, edge.getId()); DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileUpdateMsg.getIdMSB(), deviceProfileUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(edge.getId()); + edgeSynchronizationManager.getEdgeId().set(edge.getId()); switch (deviceProfileUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: @@ -68,7 +68,7 @@ public class DeviceProfileEdgeProcessor extends BaseDeviceProfileProcessor { log.warn("[{}] Failed to process DeviceProfileUpdateMsg from Edge [{}]", tenantId, deviceProfileUpdateMsg, e); return Futures.immediateFailedFuture(e); } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java index e139bf28cc..cd3a0b35ad 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java @@ -46,10 +46,10 @@ import java.util.UUID; public class EntityViewEdgeProcessor extends BaseEntityViewProcessor { public ListenableFuture processEntityViewMsgFromEdge(TenantId tenantId, Edge edge, EntityViewUpdateMsg entityViewUpdateMsg) { - log.trace("[{}] executing processEntityViewMsgFromEdge [{}] from edge [{}]", tenantId, entityViewUpdateMsg, edge.getName()); + log.trace("[{}] executing processEntityViewMsgFromEdge [{}] from edge [{}]", tenantId, entityViewUpdateMsg, edge.getId()); EntityViewId entityViewId = new EntityViewId(new UUID(entityViewUpdateMsg.getIdMSB(), entityViewUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(edge.getId()); + edgeSynchronizationManager.getEdgeId().set(edge.getId()); switch (entityViewUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: @@ -74,7 +74,7 @@ public class EntityViewEdgeProcessor extends BaseEntityViewProcessor { return Futures.immediateFailedFuture(e); } } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java index a0599c334f..2c23fd42b2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java @@ -47,18 +47,18 @@ import java.util.Set; public class RelationEdgeProcessor extends BaseRelationProcessor { public ListenableFuture processRelationMsgFromEdge(TenantId tenantId, Edge edge, RelationUpdateMsg relationUpdateMsg) { - log.trace("[{}] executing processRelationMsgFromEdge [{}] from edge [{}]", tenantId, relationUpdateMsg, edge.getName()); + log.trace("[{}] executing processRelationMsgFromEdge [{}] from edge [{}]", tenantId, relationUpdateMsg, edge.getId()); try { - edgeSynchronizationManager.getSync().set(edge.getId()); + edgeSynchronizationManager.getEdgeId().set(edge.getId()); return processRelationMsg(tenantId, relationUpdateMsg); } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } } public DownlinkMsg convertRelationEventToDownlink(EdgeEvent edgeEvent) { - EntityRelation entityRelation = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), EntityRelation.class); + EntityRelation entityRelation = JacksonUtil.convertValue(edgeEvent.getBody(), EntityRelation.class); UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); RelationUpdateMsg relationUpdateMsg = relationMsgConstructor.constructRelationUpdatedMsg(msgType, entityRelation); return DownlinkMsg.newBuilder() @@ -87,7 +87,7 @@ public class RelationEdgeProcessor extends BaseRelationProcessor { EdgeEventType.RELATION, EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()), null, - JacksonUtil.OBJECT_MAPPER.valueToTree(relation))); + JacksonUtil.valueToTree(relation))); } return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/settings/AdminSettingsEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/settings/AdminSettingsEdgeProcessor.java index 6bd21e0796..bbd9edd172 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/settings/AdminSettingsEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/settings/AdminSettingsEdgeProcessor.java @@ -32,7 +32,10 @@ import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; public class AdminSettingsEdgeProcessor extends BaseEdgeProcessor { public DownlinkMsg convertAdminSettingsEventToDownlink(EdgeEvent edgeEvent) { - AdminSettings adminSettings = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), AdminSettings.class); + AdminSettings adminSettings = JacksonUtil.convertValue(edgeEvent.getBody(), AdminSettings.class); + if (adminSettings == null) { + return null; + } AdminSettingsUpdateMsg adminSettingsUpdateMsg = adminSettingsMsgConstructor.constructAdminSettingsUpdateMsg(adminSettings); return DownlinkMsg.newBuilder() .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index 46bb940da9..af9f72e819 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -176,7 +176,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { if (attributes.size() > 0) { entityData.put("kv", attributes); entityData.put("scope", scope); - JsonNode body = JacksonUtil.OBJECT_MAPPER.valueToTree(entityData); + JsonNode body = JacksonUtil.valueToTree(entityData); log.debug("[{}] Sending attributes data msg, entityId [{}], attributes [{}]", tenantId, entityId, body); future = saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body); } else { @@ -249,7 +249,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { EdgeEventType.RELATION, EdgeEventActionType.ADDED, null, - JacksonUtil.OBJECT_MAPPER.valueToTree(relation))); + JacksonUtil.valueToTree(relation))); } } catch (Exception e) { String errMsg = String.format("[%s][%s] Exception during loading relation [%s] to edge on sync!", tenantId, edge.getId(), relation); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeSynchronizationManager.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeSynchronizationManager.java index fe83234565..812b30dc9d 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeSynchronizationManager.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeSynchronizationManager.java @@ -19,7 +19,5 @@ import org.thingsboard.server.common.data.id.EdgeId; public interface EdgeSynchronizationManager { - ThreadLocal getSync(); - - EdgeId getEdgeId(); + ThreadLocal getEdgeId(); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/DefaultEdgeSynchronizationManager.java b/dao/src/main/java/org/thingsboard/server/dao/edge/DefaultEdgeSynchronizationManager.java index 4712f11529..3d7c48efbd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/DefaultEdgeSynchronizationManager.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/DefaultEdgeSynchronizationManager.java @@ -25,10 +25,5 @@ import org.thingsboard.server.common.data.id.EdgeId; public class DefaultEdgeSynchronizationManager implements EdgeSynchronizationManager { @Getter - private final ThreadLocal sync = new ThreadLocal<>(); - - @Override - public EdgeId getEdgeId() { - return this.sync.get(); - } + private final ThreadLocal edgeId = new ThreadLocal<>(); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java index 373e314577..66a26e33fa 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java @@ -104,7 +104,7 @@ public class TbSendRPCReplyNode implements TbNode { body.put("requestId", requestIdStr); body.put("response", msg.getData()); EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(ctx.getTenantId(), edgeId, EdgeEventType.DEVICE, - EdgeEventActionType.RPC_CALL, deviceId, JacksonUtil.OBJECT_MAPPER.valueToTree(body)); + EdgeEventActionType.RPC_CALL, deviceId, JacksonUtil.valueToTree(body)); ListenableFuture future = ctx.getEdgeEventService().saveAsync(edgeEvent); Futures.addCallback(future, new FutureCallback<>() { @Override