From e2520ae0f5dfd20926d1e0e9b0ff162d3eecf16b Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Tue, 5 Sep 2023 09:38:48 +0300 Subject: [PATCH 1/4] Provide edgeId for EdgeEventSourcingListener to handle cloud entities updated on Edge and process to other Edges --- .../edge/EdgeEventSourcingListener.java | 26 +++----- .../service/edge/rpc/EdgeGrpcSession.java | 6 +- .../edge/rpc/processor/BaseEdgeProcessor.java | 65 +++++++++++-------- .../processor/alarm/AlarmEdgeProcessor.java | 33 +++++++--- .../processor/alarm/BaseAlarmProcessor.java | 33 +++++----- .../processor/asset/AssetEdgeProcessor.java | 2 +- .../asset/AssetProfileEdgeProcessor.java | 2 +- .../dashboard/DashboardEdgeProcessor.java | 2 +- .../processor/device/BaseDeviceProcessor.java | 46 ++++++------- .../processor/device/DeviceEdgeProcessor.java | 16 ++++- .../device/DeviceProfileEdgeProcessor.java | 2 +- .../entityview/EntityViewEdgeProcessor.java | 2 +- .../relation/BaseRelationProcessor.java | 4 -- .../relation/RelationEdgeProcessor.java | 12 ++++ .../queue/DefaultTbClusterService.java | 6 +- .../controller/AbstractNotifyEntityTest.java | 16 ++--- .../server/edge/AssetEdgeTest.java | 2 +- .../server/edge/DeviceEdgeTest.java | 8 +-- .../sync/ie/ExportImportServiceSqlTest.java | 8 +-- .../server/cluster/TbClusterService.java | 2 +- common/cluster-api/src/main/proto/queue.proto | 2 + .../dao/edge/EdgeSynchronizationManager.java | 6 +- .../DefaultEdgeSynchronizationManager.java | 8 +-- .../dao/eventsourcing/DeleteEntityEvent.java | 1 - 24 files changed, 172 insertions(+), 138 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index 43b05094a4..fb1e17493b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -41,7 +41,6 @@ import javax.annotation.PostConstruct; import static org.thingsboard.server.service.entitiy.DefaultTbNotificationEntityService.edgeTypeByActionType; - /** * This event listener does not support async event processing because relay on ThreadLocal * Another possible approach is to implement a special annotation and a bunch of classes similar to TransactionalApplicationListener @@ -71,9 +70,6 @@ public class EdgeEventSourcingListener { @TransactionalEventListener(fallbackExecution = true) public void handleEvent(SaveEntityEvent event) { - if (edgeSynchronizationManager.isSync()) { - return; - } try { if (!isValidEdgeEventEntity(event.getEntity())) { return; @@ -81,7 +77,7 @@ public class EdgeEventSourcingListener { log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event); EdgeEventActionType action = Boolean.TRUE.equals(event.getAdded()) ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED; tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), - null, null, action); + null, null, action, edgeSynchronizationManager.getEdgeId()); } catch (Exception e) { log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event); } @@ -89,13 +85,11 @@ public class EdgeEventSourcingListener { @TransactionalEventListener(fallbackExecution = true) public void handleEvent(DeleteEntityEvent event) { - if (edgeSynchronizationManager.isSync()) { - return; - } try { log.trace("[{}] DeleteEntityEvent called: {}", event.getTenantId(), event); - tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(), - JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED); + tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), + JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED, + edgeSynchronizationManager.getEdgeId()); } catch (Exception e) { log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event); } @@ -103,13 +97,11 @@ public class EdgeEventSourcingListener { @TransactionalEventListener(fallbackExecution = true) public void handleEvent(ActionEntityEvent event) { - if (edgeSynchronizationManager.isSync()) { - return; - } try { log.trace("[{}] ActionEntityEvent called: {}", event.getTenantId(), event); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(), - event.getBody(), null, edgeTypeByActionType(event.getActionType())); + event.getBody(), null, edgeTypeByActionType(event.getActionType()), + edgeSynchronizationManager.getEdgeId()); } catch (Exception e) { log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event); } @@ -117,9 +109,6 @@ public class EdgeEventSourcingListener { @TransactionalEventListener(fallbackExecution = true) public void handleEvent(RelationActionEvent event) { - if (edgeSynchronizationManager.isSync()) { - return; - } try { EntityRelation relation = event.getRelation(); if (relation == null) { @@ -132,7 +121,8 @@ public class EdgeEventSourcingListener { } log.trace("[{}] RelationActionEvent called: {}", event.getTenantId(), event); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, null, - JacksonUtil.toString(relation), EdgeEventType.RELATION, edgeTypeByActionType(event.getActionType())); + JacksonUtil.toString(relation), EdgeEventType.RELATION, edgeTypeByActionType(event.getActionType()), + edgeSynchronizationManager.getEdgeId()); } catch (Exception e) { log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index d5d73d47e7..63e3b82803 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -675,7 +675,7 @@ public final class EdgeGrpcSession implements Closeable { } if (uplinkMsg.getDeviceCredentialsUpdateMsgCount() > 0) { for (DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg : uplinkMsg.getDeviceCredentialsUpdateMsgList()) { - result.add(ctx.getDeviceProcessor().processDeviceCredentialsMsg(edge.getTenantId(), deviceCredentialsUpdateMsg)); + result.add(ctx.getDeviceProcessor().processDeviceCredentialsMsgFromEdge(edge.getTenantId(), edge.getId(), deviceCredentialsUpdateMsg)); } } if (uplinkMsg.getAssetProfileUpdateMsgCount() > 0) { @@ -690,7 +690,7 @@ public final class EdgeGrpcSession implements Closeable { } if (uplinkMsg.getAlarmUpdateMsgCount() > 0) { for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdateMsgList()) { - result.add(ctx.getAlarmProcessor().processAlarmMsg(edge.getTenantId(), alarmUpdateMsg)); + result.add(ctx.getAlarmProcessor().processAlarmMsgFromEdge(edge.getTenantId(), edge.getId(), alarmUpdateMsg)); } } if (uplinkMsg.getEntityViewUpdateMsgCount() > 0) { @@ -700,7 +700,7 @@ public final class EdgeGrpcSession implements Closeable { } if (uplinkMsg.getRelationUpdateMsgCount() > 0) { for (RelationUpdateMsg relationUpdateMsg : uplinkMsg.getRelationUpdateMsgList()) { - result.add(ctx.getRelationProcessor().processRelationMsg(edge.getTenantId(), relationUpdateMsg)); + result.add(ctx.getRelationProcessor().processRelationMsgFromEdge(edge.getTenantId(), edge, relationUpdateMsg)); } } if (uplinkMsg.getDashboardUpdateMsgCount() > 0) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 239f72dcd6..d527a8ea4d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -311,7 +311,9 @@ public abstract class BaseEdgeProcessor { }, dbCallbackExecutorService); } - protected ListenableFuture processActionForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId) { + protected ListenableFuture processActionForAllEdges(TenantId tenantId, EdgeEventType type, + EdgeEventActionType actionType, EntityId entityId, + EdgeId sourceEdgeId) { List> futures = new ArrayList<>(); if (TenantId.SYS_TENANT_ID.equals(tenantId)) { PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); @@ -319,21 +321,22 @@ public abstract class BaseEdgeProcessor { do { tenantsIds = tenantService.findTenantsIds(pageLink); for (TenantId tenantId1 : tenantsIds.getData()) { - futures.addAll(processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, null)); + futures.addAll(processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, null, sourceEdgeId)); } pageLink = pageLink.nextPageLink(); } while (tenantsIds.hasNext()); } else { - futures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null); + futures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null, sourceEdgeId); } return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); } private List> processActionForAllEdgesByTenantId(TenantId tenantId, - EdgeEventType type, - EdgeEventActionType actionType, - EntityId entityId, - JsonNode body) { + EdgeEventType type, + EdgeEventActionType actionType, + EntityId entityId, + JsonNode body, + EdgeId sourceEdgeId) { PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); PageData pageData; List> futures = new ArrayList<>(); @@ -341,7 +344,9 @@ public abstract class BaseEdgeProcessor { pageData = edgeService.findEdgesByTenantId(tenantId, pageLink); if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { for (Edge edge : pageData.getData()) { - futures.add(saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, body)); + if (!edge.getId().equals(sourceEdgeId)) { + futures.add(saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, body)); + } } if (pageData.hasNext()) { pageLink = pageLink.nextPageLink(); @@ -385,11 +390,12 @@ public abstract class BaseEdgeProcessor { EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType()); EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(type, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); + EdgeId sourceEdgeId = safeGetEdgeId(edgeNotificationMsg.getSourceEdgeIdMSB(), edgeNotificationMsg.getSourceEdgeIdLSB()); if (type.isAllEdgesRelated()) { - return processEntityNotificationForAllEdges(tenantId, type, actionType, entityId); + return processEntityNotificationForAllEdges(tenantId, type, actionType, entityId, sourceEdgeId); } else { JsonNode body = JacksonUtil.toJsonNode(edgeNotificationMsg.getBody()); - EdgeId edgeId = safeGetEdgeId(edgeNotificationMsg); + EdgeId edgeId = safeGetEdgeId(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB()); switch (actionType) { case UPDATED: case CREDENTIALS_UPDATED: @@ -398,41 +404,44 @@ public abstract class BaseEdgeProcessor { if (edgeId != null) { return saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body); } else { - return processNotificationToRelatedEdges(tenantId, entityId, type, actionType); + return processNotificationToRelatedEdges(tenantId, entityId, type, actionType, sourceEdgeId); } case DELETED: EdgeEventActionType deleted = EdgeEventActionType.DELETED; if (edgeId != null) { return saveEdgeEvent(tenantId, edgeId, type, deleted, entityId, body); } else { - return Futures.transform(Futures.allAsList(processActionForAllEdgesByTenantId(tenantId, type, deleted, entityId, body)), + return Futures.transform(Futures.allAsList(processActionForAllEdgesByTenantId(tenantId, type, deleted, entityId, body, sourceEdgeId)), voids -> null, dbCallbackExecutorService); } case ASSIGNED_TO_EDGE: case UNASSIGNED_FROM_EDGE: - ListenableFuture future = saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body); - return Futures.transformAsync(future, unused -> { - if (type.equals(EdgeEventType.RULE_CHAIN)) { - return updateDependentRuleChains(tenantId, new RuleChainId(entityId.getId()), edgeId); - } else { - return Futures.immediateFuture(null); - } - }, dbCallbackExecutorService); + if (sourceEdgeId == null) { + ListenableFuture future = saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body); + return Futures.transformAsync(future, unused -> { + if (type.equals(EdgeEventType.RULE_CHAIN)) { + return updateDependentRuleChains(tenantId, new RuleChainId(entityId.getId()), edgeId); + } else { + return Futures.immediateFuture(null); + } + }, dbCallbackExecutorService); + } default: return Futures.immediateFuture(null); } } } - private EdgeId safeGetEdgeId(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { - if (edgeNotificationMsg.getEdgeIdMSB() != 0 && edgeNotificationMsg.getEdgeIdLSB() != 0) { - return new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB())); + protected EdgeId safeGetEdgeId(long edgeIdMSB, long edgeIdLSB) { + if (edgeIdMSB != 0 && edgeIdLSB != 0) { + return new EdgeId(new UUID(edgeIdMSB, edgeIdLSB)); } else { return null; } } - private ListenableFuture processNotificationToRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type, EdgeEventActionType actionType) { + private ListenableFuture processNotificationToRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type, + EdgeEventActionType actionType, EdgeId sourceEdgeId) { PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); PageData pageData; List> futures = new ArrayList<>(); @@ -440,7 +449,9 @@ public abstract class BaseEdgeProcessor { pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink); if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { for (EdgeId relatedEdgeId : pageData.getData()) { - futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null)); + if (!relatedEdgeId.equals(sourceEdgeId)) { + futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null)); + } } if (pageData.hasNext()) { pageLink = pageLink.nextPageLink(); @@ -483,13 +494,13 @@ public abstract class BaseEdgeProcessor { return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); } - private ListenableFuture processEntityNotificationForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId) { + private ListenableFuture processEntityNotificationForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, EdgeId sourceEdgeId) { switch (actionType) { case ADDED: case UPDATED: case DELETED: case CREDENTIALS_UPDATED: // used by USER entity - return processActionForAllEdges(tenantId, type, actionType, entityId); + return processActionForAllEdges(tenantId, type, actionType, entityId, sourceEdgeId); default: return Futures.immediateFuture(null); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java index 0b86352c02..9e8871e5e6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java @@ -47,6 +47,16 @@ import java.util.UUID; @TbCoreComponent public class AlarmEdgeProcessor extends BaseAlarmProcessor { + public ListenableFuture processAlarmMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmUpdateMsg alarmUpdateMsg) { + log.trace("[{}] processAlarmMsgFromEdge [{}]", tenantId, alarmUpdateMsg); + try { + edgeSynchronizationManager.getSync().set(edgeId); + return processAlarmMsg(tenantId, alarmUpdateMsg); + } finally { + edgeSynchronizationManager.getSync().remove(); + } + } + public DownlinkMsg convertAlarmEventToDownlink(EdgeEvent edgeEvent) { AlarmUpdateMsg alarmUpdateMsg = convertAlarmEventToAlarmMsg(edgeEvent.getTenantId(), edgeEvent.getEntityId(), edgeEvent.getAction(), edgeEvent.getBody()); @@ -62,10 +72,12 @@ public class AlarmEdgeProcessor extends BaseAlarmProcessor { public ListenableFuture processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException { EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); + EdgeId sourceEdgeId = safeGetEdgeId(edgeNotificationMsg.getSourceEdgeIdMSB(), edgeNotificationMsg.getSourceEdgeIdLSB()); switch (actionType) { case DELETED: Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), Alarm.class); - List> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(), alarmId, actionType, JacksonUtil.OBJECT_MAPPER.valueToTree(deletedAlarm)); + List> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(), + alarmId, actionType, JacksonUtil.OBJECT_MAPPER.valueToTree(deletedAlarm), sourceEdgeId); return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); default: ListenableFuture alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); @@ -77,13 +89,14 @@ public class AlarmEdgeProcessor extends BaseAlarmProcessor { if (type == null) { return Futures.immediateFuture(null); } - List> futures = pushEventToAllRelatedEdges(tenantId, alarm.getOriginator(), alarmId, actionType, null); + List> futures = pushEventToAllRelatedEdges(tenantId, alarm.getOriginator(), + alarmId, actionType, null, sourceEdgeId); return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); }, dbCallbackExecutorService); } } - private List> pushEventToAllRelatedEdges(TenantId tenantId, EntityId originatorId, AlarmId alarmId, EdgeEventActionType actionType, JsonNode body) { + private List> pushEventToAllRelatedEdges(TenantId tenantId, EntityId originatorId, AlarmId alarmId, EdgeEventActionType actionType, JsonNode body, EdgeId sourceEdgeId) { PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); PageData pageData; List> futures = new ArrayList<>(); @@ -91,12 +104,14 @@ public class AlarmEdgeProcessor extends BaseAlarmProcessor { pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, originatorId, pageLink); if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { for (EdgeId relatedEdgeId : pageData.getData()) { - futures.add(saveEdgeEvent(tenantId, - relatedEdgeId, - EdgeEventType.ALARM, - actionType, - alarmId, - body)); + if (!relatedEdgeId.equals(sourceEdgeId)) { + futures.add(saveEdgeEvent(tenantId, + relatedEdgeId, + EdgeEventType.ALARM, + actionType, + alarmId, + body)); + } } if (pageData.hasNext()) { pageLink = pageLink.nextPageLink(); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java index fbe141ab32..2c3945346f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java @@ -40,7 +40,6 @@ import java.util.UUID; public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { public ListenableFuture processAlarmMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) { - log.trace("[{}] processAlarmMsg [{}]", tenantId, alarmUpdateMsg); EntityId originatorId = getAlarmOriginator(tenantId, alarmUpdateMsg.getOriginatorName(), EntityType.valueOf(alarmUpdateMsg.getOriginatorType())); AlarmId alarmId = new AlarmId(new UUID(alarmUpdateMsg.getIdMSB(), alarmUpdateMsg.getIdLSB())); @@ -49,7 +48,7 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { return Futures.immediateFuture(null); } try { - edgeSynchronizationManager.getSync().set(true); + switch (alarmUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: case ENTITY_UPDATED_RPC_MESSAGE: @@ -100,26 +99,11 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { } catch (Exception e) { log.error("[{}] Failed to process alarm update msg [{}]", tenantId, alarmUpdateMsg, e); return Futures.immediateFailedFuture(e); - } finally { - edgeSynchronizationManager.getSync().remove(); } return Futures.immediateFuture(null); } - private EntityId getAlarmOriginator(TenantId tenantId, String entityName, EntityType entityType) { - switch (entityType) { - case DEVICE: - return deviceService.findDeviceByTenantIdAndName(tenantId, entityName).getId(); - case ASSET: - return assetService.findAssetByTenantIdAndName(tenantId, entityName).getId(); - case ENTITY_VIEW: - return entityViewService.findEntityViewByTenantIdAndName(tenantId, entityName).getId(); - default: - return null; - } - } - - public AlarmUpdateMsg convertAlarmEventToAlarmMsg(TenantId tenantId, UUID entityId, EdgeEventActionType actionType, JsonNode body) { + protected AlarmUpdateMsg convertAlarmEventToAlarmMsg(TenantId tenantId, UUID entityId, EdgeEventActionType actionType, JsonNode body) { AlarmId alarmId = new AlarmId(entityId); UpdateMsgType msgType = getUpdateMsgType(actionType); switch (actionType) { @@ -138,4 +122,17 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { } return null; } + + private EntityId getAlarmOriginator(TenantId tenantId, String entityName, EntityType entityType) { + switch (entityType) { + case DEVICE: + return deviceService.findDeviceByTenantIdAndName(tenantId, entityName).getId(); + case ASSET: + return assetService.findAssetByTenantIdAndName(tenantId, entityName).getId(); + case ENTITY_VIEW: + return entityViewService.findEntityViewByTenantIdAndName(tenantId, entityName).getId(); + default: + return null; + } + } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java index 3e61a538c5..c36af23988 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java @@ -57,7 +57,7 @@ public class AssetEdgeProcessor extends BaseAssetProcessor { log.trace("[{}] executing processAssetMsgFromEdge [{}] from edge [{}]", tenantId, assetUpdateMsg, edge.getName()); AssetId assetId = new AssetId(new UUID(assetUpdateMsg.getIdMSB(), assetUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(true); + edgeSynchronizationManager.getSync().set(edge.getId()); switch (assetUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java index 3cae9c5123..c9f8d20f0c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java @@ -51,7 +51,7 @@ public class AssetProfileEdgeProcessor extends BaseAssetProfileProcessor { log.trace("[{}] executing processAssetProfileMsgFromEdge [{}] from edge [{}]", tenantId, assetProfileUpdateMsg, edge.getName()); AssetProfileId assetProfileId = new AssetProfileId(new UUID(assetProfileUpdateMsg.getIdMSB(), assetProfileUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(true); + edgeSynchronizationManager.getSync().set(edge.getId()); switch (assetProfileUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java index 59f12ec64b..a10f7cfbea 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java @@ -52,7 +52,7 @@ public class DashboardEdgeProcessor extends BaseDashboardProcessor { log.trace("[{}] executing processDashboardMsgFromEdge [{}] from edge [{}]", tenantId, dashboardUpdateMsg, edge.getName()); DashboardId dashboardId = new DashboardId(new UUID(dashboardUpdateMsg.getIdMSB(), dashboardUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(true); + edgeSynchronizationManager.getSync().set(edge.getId()); switch (dashboardUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/BaseDeviceProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/BaseDeviceProcessor.java index 580f752ac1..74112d1e6b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/BaseDeviceProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/BaseDeviceProcessor.java @@ -16,7 +16,6 @@ package org.thingsboard.server.service.edge.rpc.processor.device; import com.datastax.oss.driver.api.core.uuid.Uuids; -import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.util.Pair; @@ -104,34 +103,27 @@ public abstract class BaseDeviceProcessor extends BaseEdgeProcessor { return Pair.of(created, deviceNameUpdated); } - public ListenableFuture processDeviceCredentialsMsg(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) { - log.debug("[{}] Executing processDeviceCredentialsMsg, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg); + protected void updateDeviceCredentials(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) { DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB())); - return dbCallbackExecutorService.submit(() -> { - Device device = deviceService.findDeviceById(tenantId, deviceId); - if (device != null) { - log.debug("Updating device credentials for device [{}]. New device credentials Id [{}], value [{}]", - device.getName(), deviceCredentialsUpdateMsg.getCredentialsId(), deviceCredentialsUpdateMsg.getCredentialsValue()); - try { - edgeSynchronizationManager.getSync().set(true); + Device device = deviceService.findDeviceById(tenantId, deviceId); + if (device != null) { + log.debug("Updating device credentials for device [{}]. New device credentials Id [{}], value [{}]", + device.getName(), deviceCredentialsUpdateMsg.getCredentialsId(), deviceCredentialsUpdateMsg.getCredentialsValue()); + try { + DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(tenantId, device.getId()); + deviceCredentials.setCredentialsType(DeviceCredentialsType.valueOf(deviceCredentialsUpdateMsg.getCredentialsType())); + deviceCredentials.setCredentialsId(deviceCredentialsUpdateMsg.getCredentialsId()); + deviceCredentials.setCredentialsValue(deviceCredentialsUpdateMsg.hasCredentialsValue() + ? deviceCredentialsUpdateMsg.getCredentialsValue() : null); + deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials); - DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(tenantId, device.getId()); - deviceCredentials.setCredentialsType(DeviceCredentialsType.valueOf(deviceCredentialsUpdateMsg.getCredentialsType())); - deviceCredentials.setCredentialsId(deviceCredentialsUpdateMsg.getCredentialsId()); - deviceCredentials.setCredentialsValue(deviceCredentialsUpdateMsg.hasCredentialsValue() - ? deviceCredentialsUpdateMsg.getCredentialsValue() : null); - deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials); - } catch (Exception e) { - log.error("Can't update device credentials for device [{}], deviceCredentialsUpdateMsg [{}]", - device.getName(), deviceCredentialsUpdateMsg, e); - throw new RuntimeException(e); - } finally { - edgeSynchronizationManager.getSync().remove(); - } - } else { - log.warn("Can't find device by id [{}], deviceCredentialsUpdateMsg [{}]", deviceId, deviceCredentialsUpdateMsg); + } catch (Exception e) { + log.error("Can't update device credentials for device [{}], deviceCredentialsUpdateMsg [{}]", + device.getName(), deviceCredentialsUpdateMsg, e); + throw new RuntimeException(e); } - return null; - }); + } else { + log.warn("Can't find device by id [{}], deviceCredentialsUpdateMsg [{}]", deviceId, deviceCredentialsUpdateMsg); + } } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java index dfa5705840..7266dd5daf 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java @@ -35,10 +35,12 @@ import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.common.data.security.DeviceCredentials; +import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -66,7 +68,7 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor { log.trace("[{}] executing processDeviceMsgFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(true); + edgeSynchronizationManager.getSync().set(edge.getId()); switch (deviceUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: @@ -95,6 +97,18 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor { } } + public ListenableFuture processDeviceCredentialsMsgFromEdge(TenantId tenantId, EdgeId edgeId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) { + log.debug("[{}] Executing processDeviceCredentialsMsgFromEdge, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg); + try { + edgeSynchronizationManager.getSync().set(edgeId); + + updateDeviceCredentials(tenantId, deviceCredentialsUpdateMsg); + } finally { + edgeSynchronizationManager.getSync().remove(); + } + return Futures.immediateFuture(null); + } + private void saveOrUpdateDevice(TenantId tenantId, DeviceId deviceId, DeviceUpdateMsg deviceUpdateMsg, Edge edge) { CustomerId customerId = safeGetCustomerId(deviceUpdateMsg.getCustomerIdMSB(), deviceUpdateMsg.getCustomerIdLSB()); Pair resultPair = super.saveOrUpdateDevice(tenantId, deviceId, deviceUpdateMsg, customerId); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java index edaffd6f74..d5a585e587 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java @@ -52,7 +52,7 @@ public class DeviceProfileEdgeProcessor extends BaseDeviceProfileProcessor { log.trace("[{}] executing processDeviceProfileMsgFromEdge [{}] from edge [{}]", tenantId, deviceProfileUpdateMsg, edge.getName()); DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileUpdateMsg.getIdMSB(), deviceProfileUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(true); + edgeSynchronizationManager.getSync().set(edge.getId()); switch (deviceProfileUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java index e78b6b4ff5..ca14059883 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java @@ -55,7 +55,7 @@ public class EntityViewEdgeProcessor extends BaseEntityViewProcessor { log.trace("[{}] executing processEntityViewMsgFromEdge [{}] from edge [{}]", tenantId, entityViewUpdateMsg, edge.getName()); EntityViewId entityViewId = new EntityViewId(new UUID(entityViewUpdateMsg.getIdMSB(), entityViewUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(true); + edgeSynchronizationManager.getSync().set(edge.getId()); switch (entityViewUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java index 9038cc9c33..e3e5e409c9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java @@ -34,9 +34,7 @@ import java.util.UUID; public abstract class BaseRelationProcessor extends BaseEdgeProcessor { public ListenableFuture processRelationMsg(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) { - log.trace("[{}] processRelationMsg [{}]", tenantId, relationUpdateMsg); try { - edgeSynchronizationManager.getSync().set(true); EntityRelation entityRelation = new EntityRelation(); UUID fromUUID = new UUID(relationUpdateMsg.getFromIdMSB(), relationUpdateMsg.getFromIdLSB()); @@ -72,8 +70,6 @@ public abstract class BaseRelationProcessor extends BaseEdgeProcessor { } catch (Exception e) { log.error("[{}] Failed to process relation update msg [{}]", tenantId, relationUpdateMsg, e); return Futures.immediateFailedFuture(e); - } finally { - edgeSynchronizationManager.getSync().remove(); } return Futures.immediateFuture(null); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java index 48192a3973..a0599c334f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java @@ -23,6 +23,7 @@ import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; @@ -45,6 +46,17 @@ import java.util.Set; @TbCoreComponent public class RelationEdgeProcessor extends BaseRelationProcessor { + public ListenableFuture processRelationMsgFromEdge(TenantId tenantId, Edge edge, RelationUpdateMsg relationUpdateMsg) { + log.trace("[{}] executing processRelationMsgFromEdge [{}] from edge [{}]", tenantId, relationUpdateMsg, edge.getName()); + try { + edgeSynchronizationManager.getSync().set(edge.getId()); + + return processRelationMsg(tenantId, relationUpdateMsg); + } finally { + edgeSynchronizationManager.getSync().remove(); + } + } + public DownlinkMsg convertRelationEventToDownlink(EdgeEvent edgeEvent) { EntityRelation entityRelation = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), EntityRelation.class); UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 53ef38d96a..f82730435c 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -475,7 +475,7 @@ public class DefaultTbClusterService implements TbClusterService { } @Override - public void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action) { + public void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action, EdgeId sourceEdgeId) { if (!edgesEnabled) { return; } @@ -508,6 +508,10 @@ public class DefaultTbClusterService implements TbClusterService { if (body != null) { builder.setBody(body); } + if (sourceEdgeId != null) { + builder.setSourceEdgeIdMSB(sourceEdgeId.getId().getMostSignificantBits()); + builder.setSourceEdgeIdLSB(sourceEdgeId.getId().getLeastSignificantBits()); + } TransportProtos.EdgeNotificationMsgProto msg = builder.build(); log.trace("[{}] sending notification to edge service {}", tenantId.getId(), msg); pushMsgToCore(tenantId, entityId != null ? entityId : tenantId, TransportProtos.ToCoreMsg.newBuilder().setEdgeNotificationMsg(msg).build(), null); diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java index 71c244890d..d9bc42db9a 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java @@ -91,7 +91,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { int cntTime = 1; Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), Mockito.isNull(), Mockito.isNull(), Mockito.any(), Mockito.eq(EdgeEventType.RELATION), - Mockito.eq(edgeTypeByActionType(actionType))); + Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any()); ArgumentMatcher matcherOriginatorId = argument -> argument.equals(relation.getTo()); ArgumentMatcher matcherEntityClassEquals = Objects::isNull; ArgumentMatcher matcherCustomerId = customerId == null ? @@ -111,7 +111,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { ActionType actionType, int cntTime) { Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), Mockito.isNull(), Mockito.isNull(), Mockito.any(), Mockito.eq(EdgeEventType.RELATION), - Mockito.eq(edgeTypeByActionType(actionType))); + Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any()); ArgumentMatcher matcherOriginatorId = argument -> argument.getClass().equals(relation.getFrom().getClass()); ArgumentMatcher matcherEntityClassEquals = Objects::isNull; ArgumentMatcher matcherCustomerId = customerId == null ? @@ -318,13 +318,13 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { private void testNotificationMsgToEdgeServiceNeverWithActionType(EntityId entityId, ActionType actionType) { EdgeEventActionType edgeEventActionType = ActionType.CREDENTIALS_UPDATED.equals(actionType) ? EdgeEventActionType.CREDENTIALS_UPDATED : edgeTypeByActionType(actionType); - Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(), - Mockito.any(), Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.eq(edgeEventActionType)); + Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(), Mockito.any(), + Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.eq(edgeEventActionType), Mockito.any()); } private void testNotificationMsgToEdgeServiceNever(EntityId entityId) { - Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(), - Mockito.any(), Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(), Mockito.any(), + Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); } private void testLogEntityActionNever(EntityId entityId, HasName entity) { @@ -358,13 +358,13 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { argument -> argument.getClass().equals(entityId.getClass()); Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), Mockito.any(), Mockito.argThat(matcherEntityId), Mockito.any(), Mockito.isNull(), - Mockito.eq(edgeEventActionType)); + Mockito.eq(edgeEventActionType), Mockito.any()); } private void testSendNotificationMsgToEdgeServiceTimeEntityEqAny(TenantId tenantId, ActionType actionType, int cntTime) { Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), Mockito.any(), Mockito.any(EntityId.class), Mockito.any(), Mockito.isNull(), - Mockito.eq(edgeTypeByActionType(actionType))); + Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any()); } protected void testBroadcastEntityStateChangeEventTime(EntityId entityId, TenantId tenantId, int cntTime) { diff --git a/application/src/test/java/org/thingsboard/server/edge/AssetEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AssetEdgeTest.java index faf0896b81..b7e23999f6 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AssetEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AssetEdgeTest.java @@ -213,7 +213,7 @@ public class AssetEdgeTest extends AbstractEdgeTest { testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); edgeImitator.expectResponsesAmount(1); - edgeImitator.expectMessageAmount(1); + edgeImitator.expectMessageAmount(2); edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index 15b15c0be0..4e51e3e5b3 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -548,16 +548,16 @@ public class DeviceEdgeTest extends AbstractEdgeTest { uplinkMsgBuilder.addDeviceUpdateMsg(deviceUpdateMsgBuilder.build()); edgeImitator.expectResponsesAmount(1); - edgeImitator.expectMessageAmount(1); + edgeImitator.expectMessageAmount(2); edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); Assert.assertTrue(edgeImitator.waitForResponses()); Assert.assertTrue(edgeImitator.waitForMessages()); - AbstractMessage latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof DeviceCredentialsRequestMsg); - DeviceCredentialsRequestMsg latestDeviceCredentialsRequestMsg = (DeviceCredentialsRequestMsg) latestMessage; + Optional deviceCredentialsRequestMsgOpt = edgeImitator.findMessageByType(DeviceCredentialsRequestMsg.class); + Assert.assertTrue(deviceCredentialsRequestMsgOpt.isPresent()); + DeviceCredentialsRequestMsg latestDeviceCredentialsRequestMsg = deviceCredentialsRequestMsgOpt.get(); Assert.assertEquals(uuid.getMostSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdMSB()); Assert.assertEquals(uuid.getLeastSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB()); diff --git a/application/src/test/java/org/thingsboard/server/service/sync/ie/ExportImportServiceSqlTest.java b/application/src/test/java/org/thingsboard/server/service/sync/ie/ExportImportServiceSqlTest.java index 3361d516f0..c8af9c7521 100644 --- a/application/src/test/java/org/thingsboard/server/service/sync/ie/ExportImportServiceSqlTest.java +++ b/application/src/test/java/org/thingsboard/server/service/sync/ie/ExportImportServiceSqlTest.java @@ -555,7 +555,7 @@ public class ExportImportServiceSqlTest extends BaseExportImportServiceTest { Customer updatedCustomer = importEntity(tenantAdmin2, updatedCustomerEntity).getSavedEntity(); verify(entityActionService).logEntityAction(any(), eq(importedCustomer.getId()), eq(updatedCustomer), any(), eq(ActionType.UPDATED), isNull()); - verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedCustomer.getId()), any(), any(), eq(EdgeEventActionType.UPDATED)); + verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedCustomer.getId()), any(), any(), eq(EdgeEventActionType.UPDATED), any()); Mockito.reset(entityActionService); @@ -572,7 +572,7 @@ public class ExportImportServiceSqlTest extends BaseExportImportServiceTest { verify(entityActionService).logEntityAction(any(), eq(importedAssetProfile.getId()), eq(importedAssetProfile), any(), eq(ActionType.ADDED), isNull()); verify(tbClusterService).broadcastEntityStateChangeEvent(any(), eq(importedAssetProfile.getId()), eq(ComponentLifecycleEvent.CREATED)); - verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedAssetProfile.getId()), any(), any(), eq(EdgeEventActionType.ADDED)); + verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedAssetProfile.getId()), any(), any(), eq(EdgeEventActionType.ADDED), any()); Asset importedAsset = (Asset) importEntity(tenantAdmin2, getAndClone(entitiesExportData, EntityType.ASSET)).getSavedEntity(); verify(entityActionService).logEntityAction(any(), eq(importedAsset.getId()), eq(importedAsset), @@ -588,14 +588,14 @@ public class ExportImportServiceSqlTest extends BaseExportImportServiceTest { verify(entityActionService).logEntityAction(any(), eq(importedAsset.getId()), eq(updatedAsset), any(), eq(ActionType.UPDATED), isNull()); - verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedAsset.getId()), any(), any(), eq(EdgeEventActionType.UPDATED)); + verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedAsset.getId()), any(), any(), eq(EdgeEventActionType.UPDATED), any()); DeviceProfile importedDeviceProfile = (DeviceProfile) importEntity(tenantAdmin2, getAndClone(entitiesExportData, EntityType.DEVICE_PROFILE)).getSavedEntity(); verify(entityActionService).logEntityAction(any(), eq(importedDeviceProfile.getId()), eq(importedDeviceProfile), any(), eq(ActionType.ADDED), isNull()); verify(tbClusterService).onDeviceProfileChange(eq(importedDeviceProfile), any()); verify(tbClusterService).broadcastEntityStateChangeEvent(any(), eq(importedDeviceProfile.getId()), eq(ComponentLifecycleEvent.CREATED)); - verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedDeviceProfile.getId()), any(), any(), eq(EdgeEventActionType.ADDED)); + verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedDeviceProfile.getId()), any(), any(), eq(EdgeEventActionType.ADDED), any()); verify(otaPackageStateService).update(eq(importedDeviceProfile), eq(false), eq(false)); Device importedDevice = (Device) importEntity(tenantAdmin2, getAndClone(entitiesExportData, EntityType.DEVICE)).getSavedEntity(); diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java index e81aaed39d..270f971584 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java @@ -92,6 +92,6 @@ public interface TbClusterService extends TbQueueClusterService { void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse); - void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action); + void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action, EdgeId sourceEdgeId); } diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 78614911d4..9e93c9c0b0 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -744,6 +744,8 @@ message EdgeNotificationMsgProto { string body = 10; PostTelemetryMsg postTelemetryMsg = 11; PostAttributeMsg postAttributesMsg = 12; + int64 sourceEdgeIdMSB = 13; + int64 sourceEdgeIdLSB = 14; } /** diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeSynchronizationManager.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeSynchronizationManager.java index 8cce581098..fe83234565 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeSynchronizationManager.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeSynchronizationManager.java @@ -15,9 +15,11 @@ */ package org.thingsboard.server.dao.edge; +import org.thingsboard.server.common.data.id.EdgeId; + public interface EdgeSynchronizationManager { - ThreadLocal getSync(); + ThreadLocal getSync(); - boolean isSync(); + EdgeId getEdgeId(); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/DefaultEdgeSynchronizationManager.java b/dao/src/main/java/org/thingsboard/server/dao/edge/DefaultEdgeSynchronizationManager.java index 59fd2326d8..4712f11529 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/DefaultEdgeSynchronizationManager.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/DefaultEdgeSynchronizationManager.java @@ -18,17 +18,17 @@ package org.thingsboard.server.dao.edge; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.id.EdgeId; @Component @Slf4j public class DefaultEdgeSynchronizationManager implements EdgeSynchronizationManager { @Getter - private final ThreadLocal sync = new ThreadLocal<>(); + private final ThreadLocal sync = new ThreadLocal<>(); @Override - public boolean isSync() { - Boolean sync = this.sync.get(); - return sync != null && sync; + public EdgeId getEdgeId() { + return this.sync.get(); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/eventsourcing/DeleteEntityEvent.java b/dao/src/main/java/org/thingsboard/server/dao/eventsourcing/DeleteEntityEvent.java index 6277b7910b..58e632ce73 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/eventsourcing/DeleteEntityEvent.java +++ b/dao/src/main/java/org/thingsboard/server/dao/eventsourcing/DeleteEntityEvent.java @@ -26,6 +26,5 @@ import org.thingsboard.server.common.data.id.TenantId; public class DeleteEntityEvent { private final TenantId tenantId; private final EntityId entityId; - private final EdgeId edgeId; private final T entity; } From d5a654655406c87524e88b717d79febe2ed27e20 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Tue, 5 Sep 2023 10:27:13 +0300 Subject: [PATCH 2/4] Fix tests for Device and Asset on edge --- .../test/java/org/thingsboard/server/edge/AssetEdgeTest.java | 2 +- .../test/java/org/thingsboard/server/edge/DeviceEdgeTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/edge/AssetEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AssetEdgeTest.java index b7e23999f6..faf0896b81 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AssetEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AssetEdgeTest.java @@ -213,7 +213,7 @@ public class AssetEdgeTest extends AbstractEdgeTest { testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); edgeImitator.expectResponsesAmount(1); - edgeImitator.expectMessageAmount(2); + edgeImitator.expectMessageAmount(1); edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index 4e51e3e5b3..5b3b5a04e8 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -548,7 +548,7 @@ public class DeviceEdgeTest extends AbstractEdgeTest { uplinkMsgBuilder.addDeviceUpdateMsg(deviceUpdateMsgBuilder.build()); edgeImitator.expectResponsesAmount(1); - edgeImitator.expectMessageAmount(2); + edgeImitator.expectMessageAmount(1); edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); From 919e0c5081ac3a41a0f89a75a6246f8dda1cb670 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Thu, 12 Oct 2023 15:55:39 +0300 Subject: [PATCH 3/4] Code review. Minor refactoring: Sync -> EdgeId. Logging updates. Avoid usage of OBJECT_MAPPER. --- .../edge/EdgeEventSourcingListener.java | 8 ++++---- .../AbstractRuleChainMetadataConstructor.java | 19 +++++++++---------- .../RuleChainMetadataConstructorV330.java | 3 +-- .../RuleChainMetadataConstructorV340.java | 3 +-- .../fetch/AdminSettingsEdgeEventFetcher.java | 10 +++++----- .../edge/rpc/processor/BaseEdgeProcessor.java | 2 ++ .../processor/alarm/AlarmEdgeProcessor.java | 6 +++--- .../processor/alarm/BaseAlarmProcessor.java | 2 +- .../processor/asset/AssetEdgeProcessor.java | 6 +++--- .../asset/AssetProfileEdgeProcessor.java | 6 +++--- .../dashboard/DashboardEdgeProcessor.java | 6 +++--- .../processor/device/DeviceEdgeProcessor.java | 10 +++++----- .../device/DeviceProfileEdgeProcessor.java | 6 +++--- .../entityview/EntityViewEdgeProcessor.java | 6 +++--- .../relation/RelationEdgeProcessor.java | 10 +++++----- .../settings/AdminSettingsEdgeProcessor.java | 5 ++++- .../rpc/sync/DefaultEdgeRequestsService.java | 4 ++-- .../dao/edge/EdgeSynchronizationManager.java | 4 +--- .../DefaultEdgeSynchronizationManager.java | 7 +------ .../rule/engine/rpc/TbSendRPCReplyNode.java | 2 +- 20 files changed, 60 insertions(+), 65 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index fb1e17493b..d6cac66759 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -77,7 +77,7 @@ public class EdgeEventSourcingListener { log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event); EdgeEventActionType action = Boolean.TRUE.equals(event.getAdded()) ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED; tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), - null, null, action, edgeSynchronizationManager.getEdgeId()); + null, null, action, edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event); } @@ -89,7 +89,7 @@ public class EdgeEventSourcingListener { log.trace("[{}] DeleteEntityEvent called: {}", event.getTenantId(), event); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED, - edgeSynchronizationManager.getEdgeId()); + edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event); } @@ -101,7 +101,7 @@ public class EdgeEventSourcingListener { log.trace("[{}] ActionEntityEvent called: {}", event.getTenantId(), event); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(), event.getBody(), null, edgeTypeByActionType(event.getActionType()), - edgeSynchronizationManager.getEdgeId()); + edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event); } @@ -122,7 +122,7 @@ public class EdgeEventSourcingListener { log.trace("[{}] RelationActionEvent called: {}", event.getTenantId(), event); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, null, JacksonUtil.toString(relation), EdgeEventType.RELATION, edgeTypeByActionType(event.getActionType()), - edgeSynchronizationManager.getEdgeId()); + edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/AbstractRuleChainMetadataConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/AbstractRuleChainMetadataConstructor.java index f33f0f84e8..a720d0aab6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/AbstractRuleChainMetadataConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/AbstractRuleChainMetadataConstructor.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.edge.rpc.constructor.rule; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -50,7 +49,7 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM constructRuleChainMetadataUpdatedMsg(tenantId, builder, ruleChainMetaData); builder.setMsgType(msgType); return builder.build(); - } catch (JsonProcessingException ex) { + } catch (Exception ex) { log.error("[{}] Can't construct RuleChainMetadataUpdateMsg", tenantId, ex); } return null; @@ -58,7 +57,7 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM protected abstract void constructRuleChainMetadataUpdatedMsg(TenantId tenantId, RuleChainMetadataUpdateMsg.Builder builder, - RuleChainMetaData ruleChainMetaData) throws JsonProcessingException; + RuleChainMetaData ruleChainMetaData); protected List constructConnections(List connections) { List result = new ArrayList<>(); @@ -78,7 +77,7 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM .build(); } - protected List constructNodes(List nodes) throws JsonProcessingException { + protected List constructNodes(List nodes) { List result = new ArrayList<>(); if (nodes != null && !nodes.isEmpty()) { for (RuleNode node : nodes) { @@ -88,22 +87,22 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM return result; } - private RuleNodeProto constructNode(RuleNode node) throws JsonProcessingException { + private RuleNodeProto constructNode(RuleNode node) { return RuleNodeProto.newBuilder() .setIdMSB(node.getId().getId().getMostSignificantBits()) .setIdLSB(node.getId().getId().getLeastSignificantBits()) .setType(node.getType()) .setName(node.getName()) .setDebugMode(node.isDebugMode()) - .setConfiguration(JacksonUtil.OBJECT_MAPPER.writeValueAsString(node.getConfiguration())) - .setAdditionalInfo(JacksonUtil.OBJECT_MAPPER.writeValueAsString(node.getAdditionalInfo())) + .setConfiguration(JacksonUtil.toString(node.getConfiguration())) + .setAdditionalInfo(JacksonUtil.toString(node.getAdditionalInfo())) .setSingletonMode(node.isSingletonMode()) .setConfigurationVersion(node.getConfigurationVersion()) .build(); } protected List constructRuleChainConnections(List ruleChainConnections, - NavigableSet removedNodeIndexes) throws JsonProcessingException { + NavigableSet removedNodeIndexes) { List result = new ArrayList<>(); if (ruleChainConnections != null && !ruleChainConnections.isEmpty()) { for (RuleChainConnectionInfo ruleChainConnectionInfo : ruleChainConnections) { @@ -127,13 +126,13 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM return result; } - private RuleChainConnectionInfoProto constructRuleChainConnection(RuleChainConnectionInfo ruleChainConnectionInfo) throws JsonProcessingException { + private RuleChainConnectionInfoProto constructRuleChainConnection(RuleChainConnectionInfo ruleChainConnectionInfo) { return RuleChainConnectionInfoProto.newBuilder() .setFromIndex(ruleChainConnectionInfo.getFromIndex()) .setTargetRuleChainIdMSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getMostSignificantBits()) .setTargetRuleChainIdLSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getLeastSignificantBits()) .setType(ruleChainConnectionInfo.getType()) - .setAdditionalInfo(JacksonUtil.OBJECT_MAPPER.writeValueAsString(ruleChainConnectionInfo.getAdditionalInfo())) + .setAdditionalInfo(JacksonUtil.toString(ruleChainConnectionInfo.getAdditionalInfo())) .build(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/RuleChainMetadataConstructorV330.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/RuleChainMetadataConstructorV330.java index 5e1310a5a5..33fc39b7e7 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/RuleChainMetadataConstructorV330.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/RuleChainMetadataConstructorV330.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.edge.rpc.constructor.rule; -import com.fasterxml.jackson.core.JsonProcessingException; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.flow.TbRuleChainInputNode; @@ -45,7 +44,7 @@ public class RuleChainMetadataConstructorV330 extends AbstractRuleChainMetadataC @Override protected void constructRuleChainMetadataUpdatedMsg(TenantId tenantId, RuleChainMetadataUpdateMsg.Builder builder, - RuleChainMetaData ruleChainMetaData) throws JsonProcessingException { + RuleChainMetaData ruleChainMetaData) { List supportedNodes = filterNodes(ruleChainMetaData.getNodes()); NavigableSet removedNodeIndexes = getRemovedNodeIndexes(ruleChainMetaData.getNodes(), ruleChainMetaData.getConnections()); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/RuleChainMetadataConstructorV340.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/RuleChainMetadataConstructorV340.java index 6fe39942ad..9a37e416db 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/RuleChainMetadataConstructorV340.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/rule/RuleChainMetadataConstructorV340.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.edge.rpc.constructor.rule; -import com.fasterxml.jackson.core.JsonProcessingException; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.rule.RuleChainMetaData; @@ -29,7 +28,7 @@ public class RuleChainMetadataConstructorV340 extends AbstractRuleChainMetadataC @Override protected void constructRuleChainMetadataUpdatedMsg(TenantId tenantId, RuleChainMetadataUpdateMsg.Builder builder, - RuleChainMetaData ruleChainMetaData) throws JsonProcessingException { + RuleChainMetaData ruleChainMetaData) { builder.addAllNodes(constructNodes(ruleChainMetaData.getNodes())) .addAllConnections(constructConnections(ruleChainMetaData.getConnections())) .addAllRuleChainConnections(constructRuleChainConnections(ruleChainMetaData.getRuleChainConnections(), new TreeSet<>())); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AdminSettingsEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AdminSettingsEdgeEventFetcher.java index b9fc1c8c9e..e3a8c14012 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AdminSettingsEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AdminSettingsEdgeEventFetcher.java @@ -79,19 +79,19 @@ public class AdminSettingsEdgeEventFetcher implements EdgeEventFetcher { AdminSettings systemMailSettings = adminSettingsService.findAdminSettingsByKey(TenantId.SYS_TENANT_ID, "mail"); result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, - EdgeEventActionType.UPDATED, null, JacksonUtil.OBJECT_MAPPER.valueToTree(systemMailSettings))); + EdgeEventActionType.UPDATED, null, JacksonUtil.valueToTree(systemMailSettings))); AdminSettings tenantMailSettings = convertToTenantAdminSettings(tenantId, systemMailSettings.getKey(), (ObjectNode) systemMailSettings.getJsonValue()); result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, - EdgeEventActionType.UPDATED, null, JacksonUtil.OBJECT_MAPPER.valueToTree(tenantMailSettings))); + EdgeEventActionType.UPDATED, null, JacksonUtil.valueToTree(tenantMailSettings))); AdminSettings systemMailTemplates = loadMailTemplates(tenantId); result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, - EdgeEventActionType.UPDATED, null, JacksonUtil.OBJECT_MAPPER.valueToTree(systemMailTemplates))); + EdgeEventActionType.UPDATED, null, JacksonUtil.valueToTree(systemMailTemplates))); AdminSettings tenantMailTemplates = convertToTenantAdminSettings(tenantId, systemMailTemplates.getKey(), (ObjectNode) systemMailTemplates.getJsonValue()); result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, - EdgeEventActionType.UPDATED, null, JacksonUtil.OBJECT_MAPPER.valueToTree(tenantMailTemplates))); + EdgeEventActionType.UPDATED, null, JacksonUtil.valueToTree(tenantMailTemplates))); // return PageData object to be in sync with other fetchers return new PageData<>(result, 1, result.size(), false); @@ -114,7 +114,7 @@ public class AdminSettingsEdgeEventFetcher implements EdgeEventFetcher { AdminSettings adminSettings = new AdminSettings(); adminSettings.setId(new AdminSettingsId(Uuids.timeBased())); adminSettings.setKey("mailTemplates"); - adminSettings.setJsonValue(JacksonUtil.OBJECT_MAPPER.convertValue(mailTemplates, JsonNode.class)); + adminSettings.setJsonValue(JacksonUtil.convertValue(mailTemplates, JsonNode.class)); return adminSettings; } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 9b964ed922..216278b9f9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -431,6 +431,8 @@ public abstract class BaseEdgeProcessor { return Futures.immediateFuture(null); } }, dbCallbackExecutorService); + } else { + return Futures.immediateFuture(null); } default: return Futures.immediateFuture(null); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java index 9e8871e5e6..b76e8f187d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java @@ -50,10 +50,10 @@ public class AlarmEdgeProcessor extends BaseAlarmProcessor { public ListenableFuture processAlarmMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmUpdateMsg alarmUpdateMsg) { log.trace("[{}] processAlarmMsgFromEdge [{}]", tenantId, alarmUpdateMsg); try { - edgeSynchronizationManager.getSync().set(edgeId); + edgeSynchronizationManager.getEdgeId().set(edgeId); return processAlarmMsg(tenantId, alarmUpdateMsg); } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } } @@ -77,7 +77,7 @@ public class AlarmEdgeProcessor extends BaseAlarmProcessor { case DELETED: Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), Alarm.class); List> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(), - alarmId, actionType, JacksonUtil.OBJECT_MAPPER.valueToTree(deletedAlarm), sourceEdgeId); + alarmId, actionType, JacksonUtil.valueToTree(deletedAlarm), sourceEdgeId); return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); default: ListenableFuture alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java index 7dbff33d1d..03a91ae2f0 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java @@ -123,7 +123,7 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { } break; case DELETED: - Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.convertValue(body, Alarm.class); + Alarm deletedAlarm = JacksonUtil.convertValue(body, Alarm.class); return alarmMsgConstructor.constructAlarmUpdatedMsg(msgType, deletedAlarm, findOriginatorEntityName(tenantId, deletedAlarm)); } return null; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java index e3570962f2..caa03af8a5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java @@ -50,10 +50,10 @@ import java.util.UUID; public class AssetEdgeProcessor extends BaseAssetProcessor { public ListenableFuture processAssetMsgFromEdge(TenantId tenantId, Edge edge, AssetUpdateMsg assetUpdateMsg) { - log.trace("[{}] executing processAssetMsgFromEdge [{}] from edge [{}]", tenantId, assetUpdateMsg, edge.getName()); + log.trace("[{}] executing processAssetMsgFromEdge [{}] from edge [{}]", tenantId, assetUpdateMsg, edge.getId()); AssetId assetId = new AssetId(new UUID(assetUpdateMsg.getIdMSB(), assetUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(edge.getId()); + edgeSynchronizationManager.getEdgeId().set(edge.getId()); switch (assetUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: @@ -78,7 +78,7 @@ public class AssetEdgeProcessor extends BaseAssetProcessor { return Futures.immediateFailedFuture(e); } } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java index 83d23f50d7..fb55edd8c5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java @@ -49,10 +49,10 @@ import java.util.UUID; public class AssetProfileEdgeProcessor extends BaseAssetProfileProcessor { public ListenableFuture processAssetProfileMsgFromEdge(TenantId tenantId, Edge edge, AssetProfileUpdateMsg assetProfileUpdateMsg) { - log.trace("[{}] executing processAssetProfileMsgFromEdge [{}] from edge [{}]", tenantId, assetProfileUpdateMsg, edge.getName()); + log.trace("[{}] executing processAssetProfileMsgFromEdge [{}] from edge [{}]", tenantId, assetProfileUpdateMsg, edge.getId()); AssetProfileId assetProfileId = new AssetProfileId(new UUID(assetProfileUpdateMsg.getIdMSB(), assetProfileUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(edge.getId()); + edgeSynchronizationManager.getEdgeId().set(edge.getId()); switch (assetProfileUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: @@ -68,7 +68,7 @@ public class AssetProfileEdgeProcessor extends BaseAssetProfileProcessor { log.warn("[{}] Failed to process AssetProfileUpdateMsg from Edge [{}]", tenantId, assetProfileUpdateMsg, e); return Futures.immediateFailedFuture(e); } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java index 69dc3c785d..291491a2fe 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java @@ -45,10 +45,10 @@ import java.util.UUID; public class DashboardEdgeProcessor extends BaseDashboardProcessor { public ListenableFuture processDashboardMsgFromEdge(TenantId tenantId, Edge edge, DashboardUpdateMsg dashboardUpdateMsg) { - log.trace("[{}] executing processDashboardMsgFromEdge [{}] from edge [{}]", tenantId, dashboardUpdateMsg, edge.getName()); + log.trace("[{}] executing processDashboardMsgFromEdge [{}] from edge [{}]", tenantId, dashboardUpdateMsg, edge.getId()); DashboardId dashboardId = new DashboardId(new UUID(dashboardUpdateMsg.getIdMSB(), dashboardUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(edge.getId()); + edgeSynchronizationManager.getEdgeId().set(edge.getId()); switch (dashboardUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: @@ -73,7 +73,7 @@ public class DashboardEdgeProcessor extends BaseDashboardProcessor { return Futures.immediateFailedFuture(e); } } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java index 97f37ba833..ad152f754b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java @@ -66,10 +66,10 @@ import java.util.UUID; public class DeviceEdgeProcessor extends BaseDeviceProcessor { public ListenableFuture processDeviceMsgFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) { - log.trace("[{}] executing processDeviceMsgFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); + log.trace("[{}] executing processDeviceMsgFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getId()); DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(edge.getId()); + edgeSynchronizationManager.getEdgeId().set(edge.getId()); switch (deviceUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: @@ -94,18 +94,18 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor { return Futures.immediateFailedFuture(e); } } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } } public ListenableFuture processDeviceCredentialsMsgFromEdge(TenantId tenantId, EdgeId edgeId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) { log.debug("[{}] Executing processDeviceCredentialsMsgFromEdge, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg); try { - edgeSynchronizationManager.getSync().set(edgeId); + edgeSynchronizationManager.getEdgeId().set(edgeId); updateDeviceCredentials(tenantId, deviceCredentialsUpdateMsg); } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } return Futures.immediateFuture(null); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java index 4267adb8e2..3ed75bdb41 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java @@ -49,10 +49,10 @@ import java.util.UUID; public class DeviceProfileEdgeProcessor extends BaseDeviceProfileProcessor { public ListenableFuture processDeviceProfileMsgFromEdge(TenantId tenantId, Edge edge, DeviceProfileUpdateMsg deviceProfileUpdateMsg) { - log.trace("[{}] executing processDeviceProfileMsgFromEdge [{}] from edge [{}]", tenantId, deviceProfileUpdateMsg, edge.getName()); + log.trace("[{}] executing processDeviceProfileMsgFromEdge [{}] from edge [{}]", tenantId, deviceProfileUpdateMsg, edge.getId()); DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileUpdateMsg.getIdMSB(), deviceProfileUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(edge.getId()); + edgeSynchronizationManager.getEdgeId().set(edge.getId()); switch (deviceProfileUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: @@ -68,7 +68,7 @@ public class DeviceProfileEdgeProcessor extends BaseDeviceProfileProcessor { log.warn("[{}] Failed to process DeviceProfileUpdateMsg from Edge [{}]", tenantId, deviceProfileUpdateMsg, e); return Futures.immediateFailedFuture(e); } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java index e139bf28cc..cd3a0b35ad 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java @@ -46,10 +46,10 @@ import java.util.UUID; public class EntityViewEdgeProcessor extends BaseEntityViewProcessor { public ListenableFuture processEntityViewMsgFromEdge(TenantId tenantId, Edge edge, EntityViewUpdateMsg entityViewUpdateMsg) { - log.trace("[{}] executing processEntityViewMsgFromEdge [{}] from edge [{}]", tenantId, entityViewUpdateMsg, edge.getName()); + log.trace("[{}] executing processEntityViewMsgFromEdge [{}] from edge [{}]", tenantId, entityViewUpdateMsg, edge.getId()); EntityViewId entityViewId = new EntityViewId(new UUID(entityViewUpdateMsg.getIdMSB(), entityViewUpdateMsg.getIdLSB())); try { - edgeSynchronizationManager.getSync().set(edge.getId()); + edgeSynchronizationManager.getEdgeId().set(edge.getId()); switch (entityViewUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: @@ -74,7 +74,7 @@ public class EntityViewEdgeProcessor extends BaseEntityViewProcessor { return Futures.immediateFailedFuture(e); } } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java index a0599c334f..2c23fd42b2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java @@ -47,18 +47,18 @@ import java.util.Set; public class RelationEdgeProcessor extends BaseRelationProcessor { public ListenableFuture processRelationMsgFromEdge(TenantId tenantId, Edge edge, RelationUpdateMsg relationUpdateMsg) { - log.trace("[{}] executing processRelationMsgFromEdge [{}] from edge [{}]", tenantId, relationUpdateMsg, edge.getName()); + log.trace("[{}] executing processRelationMsgFromEdge [{}] from edge [{}]", tenantId, relationUpdateMsg, edge.getId()); try { - edgeSynchronizationManager.getSync().set(edge.getId()); + edgeSynchronizationManager.getEdgeId().set(edge.getId()); return processRelationMsg(tenantId, relationUpdateMsg); } finally { - edgeSynchronizationManager.getSync().remove(); + edgeSynchronizationManager.getEdgeId().remove(); } } public DownlinkMsg convertRelationEventToDownlink(EdgeEvent edgeEvent) { - EntityRelation entityRelation = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), EntityRelation.class); + EntityRelation entityRelation = JacksonUtil.convertValue(edgeEvent.getBody(), EntityRelation.class); UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); RelationUpdateMsg relationUpdateMsg = relationMsgConstructor.constructRelationUpdatedMsg(msgType, entityRelation); return DownlinkMsg.newBuilder() @@ -87,7 +87,7 @@ public class RelationEdgeProcessor extends BaseRelationProcessor { EdgeEventType.RELATION, EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()), null, - JacksonUtil.OBJECT_MAPPER.valueToTree(relation))); + JacksonUtil.valueToTree(relation))); } return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/settings/AdminSettingsEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/settings/AdminSettingsEdgeProcessor.java index 6bd21e0796..bbd9edd172 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/settings/AdminSettingsEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/settings/AdminSettingsEdgeProcessor.java @@ -32,7 +32,10 @@ import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; public class AdminSettingsEdgeProcessor extends BaseEdgeProcessor { public DownlinkMsg convertAdminSettingsEventToDownlink(EdgeEvent edgeEvent) { - AdminSettings adminSettings = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), AdminSettings.class); + AdminSettings adminSettings = JacksonUtil.convertValue(edgeEvent.getBody(), AdminSettings.class); + if (adminSettings == null) { + return null; + } AdminSettingsUpdateMsg adminSettingsUpdateMsg = adminSettingsMsgConstructor.constructAdminSettingsUpdateMsg(adminSettings); return DownlinkMsg.newBuilder() .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index 46bb940da9..af9f72e819 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -176,7 +176,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { if (attributes.size() > 0) { entityData.put("kv", attributes); entityData.put("scope", scope); - JsonNode body = JacksonUtil.OBJECT_MAPPER.valueToTree(entityData); + JsonNode body = JacksonUtil.valueToTree(entityData); log.debug("[{}] Sending attributes data msg, entityId [{}], attributes [{}]", tenantId, entityId, body); future = saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body); } else { @@ -249,7 +249,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { EdgeEventType.RELATION, EdgeEventActionType.ADDED, null, - JacksonUtil.OBJECT_MAPPER.valueToTree(relation))); + JacksonUtil.valueToTree(relation))); } } catch (Exception e) { String errMsg = String.format("[%s][%s] Exception during loading relation [%s] to edge on sync!", tenantId, edge.getId(), relation); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeSynchronizationManager.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeSynchronizationManager.java index fe83234565..812b30dc9d 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeSynchronizationManager.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeSynchronizationManager.java @@ -19,7 +19,5 @@ import org.thingsboard.server.common.data.id.EdgeId; public interface EdgeSynchronizationManager { - ThreadLocal getSync(); - - EdgeId getEdgeId(); + ThreadLocal getEdgeId(); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/DefaultEdgeSynchronizationManager.java b/dao/src/main/java/org/thingsboard/server/dao/edge/DefaultEdgeSynchronizationManager.java index 4712f11529..3d7c48efbd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/DefaultEdgeSynchronizationManager.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/DefaultEdgeSynchronizationManager.java @@ -25,10 +25,5 @@ import org.thingsboard.server.common.data.id.EdgeId; public class DefaultEdgeSynchronizationManager implements EdgeSynchronizationManager { @Getter - private final ThreadLocal sync = new ThreadLocal<>(); - - @Override - public EdgeId getEdgeId() { - return this.sync.get(); - } + private final ThreadLocal edgeId = new ThreadLocal<>(); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java index 373e314577..66a26e33fa 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNode.java @@ -104,7 +104,7 @@ public class TbSendRPCReplyNode implements TbNode { body.put("requestId", requestIdStr); body.put("response", msg.getData()); EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(ctx.getTenantId(), edgeId, EdgeEventType.DEVICE, - EdgeEventActionType.RPC_CALL, deviceId, JacksonUtil.OBJECT_MAPPER.valueToTree(body)); + EdgeEventActionType.RPC_CALL, deviceId, JacksonUtil.valueToTree(body)); ListenableFuture future = ctx.getEdgeEventService().saveAsync(edgeEvent); Futures.addCallback(future, new FutureCallback<>() { @Override From c1d547265eb845beb289e5f622de8e82f30654d5 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Fri, 13 Oct 2023 15:42:12 +0300 Subject: [PATCH 4/4] Minor refactoring: changes to be in sync with edge, refactor objectmapper calls --- .../edge/rpc/processor/alarm/AlarmEdgeProcessor.java | 8 +++++--- .../edge/rpc/processor/alarm/BaseAlarmProcessor.java | 6 +++--- .../edge/rpc/processor/device/DeviceEdgeProcessor.java | 6 ++---- .../service/edge/rpc/processor/edge/EdgeProcessor.java | 8 ++++---- .../rpc/processor/relation/BaseRelationProcessor.java | 2 +- .../rpc/processor/relation/RelationEdgeProcessor.java | 8 +++----- .../rpc/processor/telemetry/BaseTelemetryProcessor.java | 6 +++--- 7 files changed, 21 insertions(+), 23 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java index b76e8f187d..f002b5c04e 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.edge.rpc.processor.alarm; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -69,13 +68,16 @@ public class AlarmEdgeProcessor extends BaseAlarmProcessor { return null; } - public ListenableFuture processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException { + public ListenableFuture processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); EdgeId sourceEdgeId = safeGetEdgeId(edgeNotificationMsg.getSourceEdgeIdMSB(), edgeNotificationMsg.getSourceEdgeIdLSB()); switch (actionType) { case DELETED: - Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), Alarm.class); + Alarm deletedAlarm = JacksonUtil.fromString(edgeNotificationMsg.getBody(), Alarm.class); + if (deletedAlarm == null) { + return Futures.immediateFuture(null); + } List> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(), alarmId, actionType, JacksonUtil.valueToTree(deletedAlarm), sourceEdgeId); return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java index 03a91ae2f0..3fb086cd52 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java @@ -45,7 +45,7 @@ import java.util.UUID; @Slf4j public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { - public ListenableFuture processAlarmMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) { + protected ListenableFuture processAlarmMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) { EntityId originatorId = getAlarmOriginator(tenantId, alarmUpdateMsg.getOriginatorName(), EntityType.valueOf(alarmUpdateMsg.getOriginatorType())); AlarmId alarmId = new AlarmId(new UUID(alarmUpdateMsg.getIdMSB(), alarmUpdateMsg.getIdLSB())); @@ -72,7 +72,7 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { alarm.setAcknowledged(alarmStatus.isAck()); alarm.setAckTs(alarmUpdateMsg.getAckTs()); alarm.setEndTs(alarmUpdateMsg.getEndTs()); - alarm.setDetails(JacksonUtil.OBJECT_MAPPER.readTree(alarmUpdateMsg.getDetails())); + alarm.setDetails(JacksonUtil.toJsonNode(alarmUpdateMsg.getDetails())); if (UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE.equals(alarmUpdateMsg.getMsgType())) { alarmService.createAlarm(AlarmCreateOrUpdateActiveRequest.fromAlarm(alarm, null, alarmId)); } else { @@ -89,7 +89,7 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { Alarm alarmToClear = alarmService.findAlarmById(tenantId, alarmId); if (alarmToClear != null) { alarmService.clearAlarm(tenantId, alarmId, alarmUpdateMsg.getClearTs(), - JacksonUtil.OBJECT_MAPPER.readTree(alarmUpdateMsg.getDetails())); + JacksonUtil.toJsonNode(alarmUpdateMsg.getDetails())); } break; case ENTITY_DELETED_RPC_MESSAGE: diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java index ad152f754b..c2f18daf0d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.edge.rpc.processor.device; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -40,7 +39,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.common.data.security.DeviceCredentials; -import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -196,7 +194,7 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor { data.put("method", deviceRpcCallMsg.getRequestMsg().getMethod()); data.put("params", deviceRpcCallMsg.getRequestMsg().getParams()); TbMsg tbMsg = TbMsg.newMsg(TbMsgType.TO_SERVER_RPC_REQUEST, deviceId, null, metaData, - TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(data)); + TbMsgDataType.JSON, JacksonUtil.toString(data)); tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { @@ -210,7 +208,7 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor { tenantId, device, deviceRpcCallMsg, t); } }); - } catch (JsonProcessingException | IllegalArgumentException e) { + } catch (IllegalArgumentException e) { log.warn("[{}][{}] Failed to push TO_SERVER_RPC_REQUEST to rule engine. deviceRpcCallMsg {}", tenantId, deviceId, deviceRpcCallMsg, e); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/edge/EdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/edge/EdgeProcessor.java index b38659c881..daffefe477 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/edge/EdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/edge/EdgeProcessor.java @@ -72,9 +72,9 @@ public class EdgeProcessor extends BaseEdgeProcessor { EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); switch (actionType) { case ASSIGNED_TO_CUSTOMER: - CustomerId customerId = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), CustomerId.class); + CustomerId customerId = JacksonUtil.fromString(edgeNotificationMsg.getBody(), CustomerId.class); Edge edge = edgeService.findEdgeById(tenantId, edgeId); - if (edge == null || customerId.isNullUid()) { + if (customerId != null && (edge == null || customerId.isNullUid())) { return Futures.immediateFuture(null); } List> futures = new ArrayList<>(); @@ -96,9 +96,9 @@ public class EdgeProcessor extends BaseEdgeProcessor { } while (pageData != null && pageData.hasNext()); return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); case UNASSIGNED_FROM_CUSTOMER: - CustomerId customerIdToDelete = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), CustomerId.class); + CustomerId customerIdToDelete = JacksonUtil.fromString(edgeNotificationMsg.getBody(), CustomerId.class); edge = edgeService.findEdgeById(tenantId, edgeId); - if (edge == null || customerIdToDelete.isNullUid()) { + if (customerIdToDelete != null && (edge == null || customerIdToDelete.isNullUid())) { return Futures.immediateFuture(null); } return Futures.transformAsync(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.EDGE, EdgeEventActionType.UNASSIGNED_FROM_CUSTOMER, edgeId, null), diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java index 9729c861ee..952affdc3d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java @@ -33,7 +33,7 @@ import java.util.UUID; @Slf4j public abstract class BaseRelationProcessor extends BaseEdgeProcessor { - public ListenableFuture processRelationMsg(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) { + protected ListenableFuture processRelationMsg(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) { try { EntityRelation entityRelation = new EntityRelation(); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java index 2c23fd42b2..fe76fed545 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.edge.rpc.processor.relation; -import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; @@ -67,10 +66,9 @@ public class RelationEdgeProcessor extends BaseRelationProcessor { .build(); } - public ListenableFuture processRelationNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException { - EntityRelation relation = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), EntityRelation.class); - if (relation.getFrom().getEntityType().equals(EntityType.EDGE) || - relation.getTo().getEntityType().equals(EntityType.EDGE)) { + public ListenableFuture processRelationNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { + EntityRelation relation = JacksonUtil.fromString(edgeNotificationMsg.getBody(), EntityRelation.class); + if (relation == null || (relation.getFrom().getEntityType().equals(EntityType.EDGE) || relation.getTo().getEntityType().equals(EntityType.EDGE))) { return Futures.immediateFuture(null); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java index c2d172ede0..f68d668329 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java @@ -22,7 +22,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.gson.Gson; -import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; @@ -346,8 +345,9 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { log.warn("[{}] Unsupported edge event type [{}]", tenantId, entityType); return null; } - JsonElement entityData = JsonParser.parseString(JacksonUtil.OBJECT_MAPPER.writeValueAsString(body)); - return entityDataMsgConstructor.constructEntityDataMsg(tenantId, entityId, actionType, entityData); + String bodyJackson = JacksonUtil.toString(body); + return bodyJackson == null ? null : + entityDataMsgConstructor.constructEntityDataMsg(tenantId, entityId, actionType, JsonParser.parseString(bodyJackson)); } }