Provide edgeId for EdgeEventSourcingListener to handle cloud entities updated on Edge and process to other Edges

This commit is contained in:
Andrii Landiak 2023-09-05 09:38:48 +03:00
parent 76ea7c95d3
commit e2520ae0f5
24 changed files with 172 additions and 138 deletions

View File

@ -41,7 +41,6 @@ import javax.annotation.PostConstruct;
import static org.thingsboard.server.service.entitiy.DefaultTbNotificationEntityService.edgeTypeByActionType; import static org.thingsboard.server.service.entitiy.DefaultTbNotificationEntityService.edgeTypeByActionType;
/** /**
* This event listener does not support async event processing because relay on ThreadLocal * This event listener does not support async event processing because relay on ThreadLocal
* Another possible approach is to implement a special annotation and a bunch of classes similar to TransactionalApplicationListener * Another possible approach is to implement a special annotation and a bunch of classes similar to TransactionalApplicationListener
@ -71,9 +70,6 @@ public class EdgeEventSourcingListener {
@TransactionalEventListener(fallbackExecution = true) @TransactionalEventListener(fallbackExecution = true)
public void handleEvent(SaveEntityEvent<?> event) { public void handleEvent(SaveEntityEvent<?> event) {
if (edgeSynchronizationManager.isSync()) {
return;
}
try { try {
if (!isValidEdgeEventEntity(event.getEntity())) { if (!isValidEdgeEventEntity(event.getEntity())) {
return; return;
@ -81,7 +77,7 @@ public class EdgeEventSourcingListener {
log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event); log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event);
EdgeEventActionType action = Boolean.TRUE.equals(event.getAdded()) ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED; EdgeEventActionType action = Boolean.TRUE.equals(event.getAdded()) ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED;
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(),
null, null, action); null, null, action, edgeSynchronizationManager.getEdgeId());
} catch (Exception e) { } catch (Exception e) {
log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event); log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event);
} }
@ -89,13 +85,11 @@ public class EdgeEventSourcingListener {
@TransactionalEventListener(fallbackExecution = true) @TransactionalEventListener(fallbackExecution = true)
public void handleEvent(DeleteEntityEvent<?> event) { public void handleEvent(DeleteEntityEvent<?> event) {
if (edgeSynchronizationManager.isSync()) {
return;
}
try { try {
log.trace("[{}] DeleteEntityEvent called: {}", event.getTenantId(), event); log.trace("[{}] DeleteEntityEvent called: {}", event.getTenantId(), event);
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(), tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(),
JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED); JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED,
edgeSynchronizationManager.getEdgeId());
} catch (Exception e) { } catch (Exception e) {
log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event); log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event);
} }
@ -103,13 +97,11 @@ public class EdgeEventSourcingListener {
@TransactionalEventListener(fallbackExecution = true) @TransactionalEventListener(fallbackExecution = true)
public void handleEvent(ActionEntityEvent event) { public void handleEvent(ActionEntityEvent event) {
if (edgeSynchronizationManager.isSync()) {
return;
}
try { try {
log.trace("[{}] ActionEntityEvent called: {}", event.getTenantId(), event); log.trace("[{}] ActionEntityEvent called: {}", event.getTenantId(), event);
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(), tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(),
event.getBody(), null, edgeTypeByActionType(event.getActionType())); event.getBody(), null, edgeTypeByActionType(event.getActionType()),
edgeSynchronizationManager.getEdgeId());
} catch (Exception e) { } catch (Exception e) {
log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event); log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event);
} }
@ -117,9 +109,6 @@ public class EdgeEventSourcingListener {
@TransactionalEventListener(fallbackExecution = true) @TransactionalEventListener(fallbackExecution = true)
public void handleEvent(RelationActionEvent event) { public void handleEvent(RelationActionEvent event) {
if (edgeSynchronizationManager.isSync()) {
return;
}
try { try {
EntityRelation relation = event.getRelation(); EntityRelation relation = event.getRelation();
if (relation == null) { if (relation == null) {
@ -132,7 +121,8 @@ public class EdgeEventSourcingListener {
} }
log.trace("[{}] RelationActionEvent called: {}", event.getTenantId(), event); log.trace("[{}] RelationActionEvent called: {}", event.getTenantId(), event);
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, null, tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, null,
JacksonUtil.toString(relation), EdgeEventType.RELATION, edgeTypeByActionType(event.getActionType())); JacksonUtil.toString(relation), EdgeEventType.RELATION, edgeTypeByActionType(event.getActionType()),
edgeSynchronizationManager.getEdgeId());
} catch (Exception e) { } catch (Exception e) {
log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event); log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event);
} }

View File

@ -675,7 +675,7 @@ public final class EdgeGrpcSession implements Closeable {
} }
if (uplinkMsg.getDeviceCredentialsUpdateMsgCount() > 0) { if (uplinkMsg.getDeviceCredentialsUpdateMsgCount() > 0) {
for (DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg : uplinkMsg.getDeviceCredentialsUpdateMsgList()) { for (DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg : uplinkMsg.getDeviceCredentialsUpdateMsgList()) {
result.add(ctx.getDeviceProcessor().processDeviceCredentialsMsg(edge.getTenantId(), deviceCredentialsUpdateMsg)); result.add(ctx.getDeviceProcessor().processDeviceCredentialsMsgFromEdge(edge.getTenantId(), edge.getId(), deviceCredentialsUpdateMsg));
} }
} }
if (uplinkMsg.getAssetProfileUpdateMsgCount() > 0) { if (uplinkMsg.getAssetProfileUpdateMsgCount() > 0) {
@ -690,7 +690,7 @@ public final class EdgeGrpcSession implements Closeable {
} }
if (uplinkMsg.getAlarmUpdateMsgCount() > 0) { if (uplinkMsg.getAlarmUpdateMsgCount() > 0) {
for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdateMsgList()) { for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdateMsgList()) {
result.add(ctx.getAlarmProcessor().processAlarmMsg(edge.getTenantId(), alarmUpdateMsg)); result.add(ctx.getAlarmProcessor().processAlarmMsgFromEdge(edge.getTenantId(), edge.getId(), alarmUpdateMsg));
} }
} }
if (uplinkMsg.getEntityViewUpdateMsgCount() > 0) { if (uplinkMsg.getEntityViewUpdateMsgCount() > 0) {
@ -700,7 +700,7 @@ public final class EdgeGrpcSession implements Closeable {
} }
if (uplinkMsg.getRelationUpdateMsgCount() > 0) { if (uplinkMsg.getRelationUpdateMsgCount() > 0) {
for (RelationUpdateMsg relationUpdateMsg : uplinkMsg.getRelationUpdateMsgList()) { for (RelationUpdateMsg relationUpdateMsg : uplinkMsg.getRelationUpdateMsgList()) {
result.add(ctx.getRelationProcessor().processRelationMsg(edge.getTenantId(), relationUpdateMsg)); result.add(ctx.getRelationProcessor().processRelationMsgFromEdge(edge.getTenantId(), edge, relationUpdateMsg));
} }
} }
if (uplinkMsg.getDashboardUpdateMsgCount() > 0) { if (uplinkMsg.getDashboardUpdateMsgCount() > 0) {

View File

@ -311,7 +311,9 @@ public abstract class BaseEdgeProcessor {
}, dbCallbackExecutorService); }, dbCallbackExecutorService);
} }
protected ListenableFuture<Void> processActionForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId) { protected ListenableFuture<Void> processActionForAllEdges(TenantId tenantId, EdgeEventType type,
EdgeEventActionType actionType, EntityId entityId,
EdgeId sourceEdgeId) {
List<ListenableFuture<Void>> futures = new ArrayList<>(); List<ListenableFuture<Void>> futures = new ArrayList<>();
if (TenantId.SYS_TENANT_ID.equals(tenantId)) { if (TenantId.SYS_TENANT_ID.equals(tenantId)) {
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
@ -319,21 +321,22 @@ public abstract class BaseEdgeProcessor {
do { do {
tenantsIds = tenantService.findTenantsIds(pageLink); tenantsIds = tenantService.findTenantsIds(pageLink);
for (TenantId tenantId1 : tenantsIds.getData()) { for (TenantId tenantId1 : tenantsIds.getData()) {
futures.addAll(processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, null)); futures.addAll(processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, null, sourceEdgeId));
} }
pageLink = pageLink.nextPageLink(); pageLink = pageLink.nextPageLink();
} while (tenantsIds.hasNext()); } while (tenantsIds.hasNext());
} else { } else {
futures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null); futures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null, sourceEdgeId);
} }
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
} }
private List<ListenableFuture<Void>> processActionForAllEdgesByTenantId(TenantId tenantId, private List<ListenableFuture<Void>> processActionForAllEdgesByTenantId(TenantId tenantId,
EdgeEventType type, EdgeEventType type,
EdgeEventActionType actionType, EdgeEventActionType actionType,
EntityId entityId, EntityId entityId,
JsonNode body) { JsonNode body,
EdgeId sourceEdgeId) {
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
PageData<Edge> pageData; PageData<Edge> pageData;
List<ListenableFuture<Void>> futures = new ArrayList<>(); List<ListenableFuture<Void>> futures = new ArrayList<>();
@ -341,7 +344,9 @@ public abstract class BaseEdgeProcessor {
pageData = edgeService.findEdgesByTenantId(tenantId, pageLink); pageData = edgeService.findEdgesByTenantId(tenantId, pageLink);
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
for (Edge edge : pageData.getData()) { for (Edge edge : pageData.getData()) {
futures.add(saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, body)); if (!edge.getId().equals(sourceEdgeId)) {
futures.add(saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, body));
}
} }
if (pageData.hasNext()) { if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink(); pageLink = pageLink.nextPageLink();
@ -385,11 +390,12 @@ public abstract class BaseEdgeProcessor {
EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType()); EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType());
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(type, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(type, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
EdgeId sourceEdgeId = safeGetEdgeId(edgeNotificationMsg.getSourceEdgeIdMSB(), edgeNotificationMsg.getSourceEdgeIdLSB());
if (type.isAllEdgesRelated()) { if (type.isAllEdgesRelated()) {
return processEntityNotificationForAllEdges(tenantId, type, actionType, entityId); return processEntityNotificationForAllEdges(tenantId, type, actionType, entityId, sourceEdgeId);
} else { } else {
JsonNode body = JacksonUtil.toJsonNode(edgeNotificationMsg.getBody()); JsonNode body = JacksonUtil.toJsonNode(edgeNotificationMsg.getBody());
EdgeId edgeId = safeGetEdgeId(edgeNotificationMsg); EdgeId edgeId = safeGetEdgeId(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB());
switch (actionType) { switch (actionType) {
case UPDATED: case UPDATED:
case CREDENTIALS_UPDATED: case CREDENTIALS_UPDATED:
@ -398,41 +404,44 @@ public abstract class BaseEdgeProcessor {
if (edgeId != null) { if (edgeId != null) {
return saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body); return saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body);
} else { } else {
return processNotificationToRelatedEdges(tenantId, entityId, type, actionType); return processNotificationToRelatedEdges(tenantId, entityId, type, actionType, sourceEdgeId);
} }
case DELETED: case DELETED:
EdgeEventActionType deleted = EdgeEventActionType.DELETED; EdgeEventActionType deleted = EdgeEventActionType.DELETED;
if (edgeId != null) { if (edgeId != null) {
return saveEdgeEvent(tenantId, edgeId, type, deleted, entityId, body); return saveEdgeEvent(tenantId, edgeId, type, deleted, entityId, body);
} else { } else {
return Futures.transform(Futures.allAsList(processActionForAllEdgesByTenantId(tenantId, type, deleted, entityId, body)), return Futures.transform(Futures.allAsList(processActionForAllEdgesByTenantId(tenantId, type, deleted, entityId, body, sourceEdgeId)),
voids -> null, dbCallbackExecutorService); voids -> null, dbCallbackExecutorService);
} }
case ASSIGNED_TO_EDGE: case ASSIGNED_TO_EDGE:
case UNASSIGNED_FROM_EDGE: case UNASSIGNED_FROM_EDGE:
ListenableFuture<Void> future = saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body); if (sourceEdgeId == null) {
return Futures.transformAsync(future, unused -> { ListenableFuture<Void> future = saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body);
if (type.equals(EdgeEventType.RULE_CHAIN)) { return Futures.transformAsync(future, unused -> {
return updateDependentRuleChains(tenantId, new RuleChainId(entityId.getId()), edgeId); if (type.equals(EdgeEventType.RULE_CHAIN)) {
} else { return updateDependentRuleChains(tenantId, new RuleChainId(entityId.getId()), edgeId);
return Futures.immediateFuture(null); } else {
} return Futures.immediateFuture(null);
}, dbCallbackExecutorService); }
}, dbCallbackExecutorService);
}
default: default:
return Futures.immediateFuture(null); return Futures.immediateFuture(null);
} }
} }
} }
private EdgeId safeGetEdgeId(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { protected EdgeId safeGetEdgeId(long edgeIdMSB, long edgeIdLSB) {
if (edgeNotificationMsg.getEdgeIdMSB() != 0 && edgeNotificationMsg.getEdgeIdLSB() != 0) { if (edgeIdMSB != 0 && edgeIdLSB != 0) {
return new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB())); return new EdgeId(new UUID(edgeIdMSB, edgeIdLSB));
} else { } else {
return null; return null;
} }
} }
private ListenableFuture<Void> processNotificationToRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type, EdgeEventActionType actionType) { private ListenableFuture<Void> processNotificationToRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type,
EdgeEventActionType actionType, EdgeId sourceEdgeId) {
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
PageData<EdgeId> pageData; PageData<EdgeId> pageData;
List<ListenableFuture<Void>> futures = new ArrayList<>(); List<ListenableFuture<Void>> futures = new ArrayList<>();
@ -440,7 +449,9 @@ public abstract class BaseEdgeProcessor {
pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink); pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink);
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
for (EdgeId relatedEdgeId : pageData.getData()) { for (EdgeId relatedEdgeId : pageData.getData()) {
futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null)); if (!relatedEdgeId.equals(sourceEdgeId)) {
futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null));
}
} }
if (pageData.hasNext()) { if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink(); pageLink = pageLink.nextPageLink();
@ -483,13 +494,13 @@ public abstract class BaseEdgeProcessor {
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
} }
private ListenableFuture<Void> processEntityNotificationForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId) { private ListenableFuture<Void> processEntityNotificationForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, EdgeId sourceEdgeId) {
switch (actionType) { switch (actionType) {
case ADDED: case ADDED:
case UPDATED: case UPDATED:
case DELETED: case DELETED:
case CREDENTIALS_UPDATED: // used by USER entity case CREDENTIALS_UPDATED: // used by USER entity
return processActionForAllEdges(tenantId, type, actionType, entityId); return processActionForAllEdges(tenantId, type, actionType, entityId, sourceEdgeId);
default: default:
return Futures.immediateFuture(null); return Futures.immediateFuture(null);
} }

View File

@ -47,6 +47,16 @@ import java.util.UUID;
@TbCoreComponent @TbCoreComponent
public class AlarmEdgeProcessor extends BaseAlarmProcessor { public class AlarmEdgeProcessor extends BaseAlarmProcessor {
public ListenableFuture<Void> processAlarmMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmUpdateMsg alarmUpdateMsg) {
log.trace("[{}] processAlarmMsgFromEdge [{}]", tenantId, alarmUpdateMsg);
try {
edgeSynchronizationManager.getSync().set(edgeId);
return processAlarmMsg(tenantId, alarmUpdateMsg);
} finally {
edgeSynchronizationManager.getSync().remove();
}
}
public DownlinkMsg convertAlarmEventToDownlink(EdgeEvent edgeEvent) { public DownlinkMsg convertAlarmEventToDownlink(EdgeEvent edgeEvent) {
AlarmUpdateMsg alarmUpdateMsg = AlarmUpdateMsg alarmUpdateMsg =
convertAlarmEventToAlarmMsg(edgeEvent.getTenantId(), edgeEvent.getEntityId(), edgeEvent.getAction(), edgeEvent.getBody()); convertAlarmEventToAlarmMsg(edgeEvent.getTenantId(), edgeEvent.getEntityId(), edgeEvent.getAction(), edgeEvent.getBody());
@ -62,10 +72,12 @@ public class AlarmEdgeProcessor extends BaseAlarmProcessor {
public ListenableFuture<Void> processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException { public ListenableFuture<Void> processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException {
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
EdgeId sourceEdgeId = safeGetEdgeId(edgeNotificationMsg.getSourceEdgeIdMSB(), edgeNotificationMsg.getSourceEdgeIdLSB());
switch (actionType) { switch (actionType) {
case DELETED: case DELETED:
Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), Alarm.class); Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), Alarm.class);
List<ListenableFuture<Void>> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(), alarmId, actionType, JacksonUtil.OBJECT_MAPPER.valueToTree(deletedAlarm)); List<ListenableFuture<Void>> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(),
alarmId, actionType, JacksonUtil.OBJECT_MAPPER.valueToTree(deletedAlarm), sourceEdgeId);
return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService);
default: default:
ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId);
@ -77,13 +89,14 @@ public class AlarmEdgeProcessor extends BaseAlarmProcessor {
if (type == null) { if (type == null) {
return Futures.immediateFuture(null); return Futures.immediateFuture(null);
} }
List<ListenableFuture<Void>> futures = pushEventToAllRelatedEdges(tenantId, alarm.getOriginator(), alarmId, actionType, null); List<ListenableFuture<Void>> futures = pushEventToAllRelatedEdges(tenantId, alarm.getOriginator(),
alarmId, actionType, null, sourceEdgeId);
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
}, dbCallbackExecutorService); }, dbCallbackExecutorService);
} }
} }
private List<ListenableFuture<Void>> pushEventToAllRelatedEdges(TenantId tenantId, EntityId originatorId, AlarmId alarmId, EdgeEventActionType actionType, JsonNode body) { private List<ListenableFuture<Void>> pushEventToAllRelatedEdges(TenantId tenantId, EntityId originatorId, AlarmId alarmId, EdgeEventActionType actionType, JsonNode body, EdgeId sourceEdgeId) {
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
PageData<EdgeId> pageData; PageData<EdgeId> pageData;
List<ListenableFuture<Void>> futures = new ArrayList<>(); List<ListenableFuture<Void>> futures = new ArrayList<>();
@ -91,12 +104,14 @@ public class AlarmEdgeProcessor extends BaseAlarmProcessor {
pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, originatorId, pageLink); pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, originatorId, pageLink);
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
for (EdgeId relatedEdgeId : pageData.getData()) { for (EdgeId relatedEdgeId : pageData.getData()) {
futures.add(saveEdgeEvent(tenantId, if (!relatedEdgeId.equals(sourceEdgeId)) {
relatedEdgeId, futures.add(saveEdgeEvent(tenantId,
EdgeEventType.ALARM, relatedEdgeId,
actionType, EdgeEventType.ALARM,
alarmId, actionType,
body)); alarmId,
body));
}
} }
if (pageData.hasNext()) { if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink(); pageLink = pageLink.nextPageLink();

View File

@ -40,7 +40,6 @@ import java.util.UUID;
public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { public abstract class BaseAlarmProcessor extends BaseEdgeProcessor {
public ListenableFuture<Void> processAlarmMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) { public ListenableFuture<Void> processAlarmMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) {
log.trace("[{}] processAlarmMsg [{}]", tenantId, alarmUpdateMsg);
EntityId originatorId = getAlarmOriginator(tenantId, alarmUpdateMsg.getOriginatorName(), EntityId originatorId = getAlarmOriginator(tenantId, alarmUpdateMsg.getOriginatorName(),
EntityType.valueOf(alarmUpdateMsg.getOriginatorType())); EntityType.valueOf(alarmUpdateMsg.getOriginatorType()));
AlarmId alarmId = new AlarmId(new UUID(alarmUpdateMsg.getIdMSB(), alarmUpdateMsg.getIdLSB())); AlarmId alarmId = new AlarmId(new UUID(alarmUpdateMsg.getIdMSB(), alarmUpdateMsg.getIdLSB()));
@ -49,7 +48,7 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor {
return Futures.immediateFuture(null); return Futures.immediateFuture(null);
} }
try { try {
edgeSynchronizationManager.getSync().set(true);
switch (alarmUpdateMsg.getMsgType()) { switch (alarmUpdateMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE: case ENTITY_CREATED_RPC_MESSAGE:
case ENTITY_UPDATED_RPC_MESSAGE: case ENTITY_UPDATED_RPC_MESSAGE:
@ -100,26 +99,11 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor {
} catch (Exception e) { } catch (Exception e) {
log.error("[{}] Failed to process alarm update msg [{}]", tenantId, alarmUpdateMsg, e); log.error("[{}] Failed to process alarm update msg [{}]", tenantId, alarmUpdateMsg, e);
return Futures.immediateFailedFuture(e); return Futures.immediateFailedFuture(e);
} finally {
edgeSynchronizationManager.getSync().remove();
} }
return Futures.immediateFuture(null); return Futures.immediateFuture(null);
} }
private EntityId getAlarmOriginator(TenantId tenantId, String entityName, EntityType entityType) { protected AlarmUpdateMsg convertAlarmEventToAlarmMsg(TenantId tenantId, UUID entityId, EdgeEventActionType actionType, JsonNode body) {
switch (entityType) {
case DEVICE:
return deviceService.findDeviceByTenantIdAndName(tenantId, entityName).getId();
case ASSET:
return assetService.findAssetByTenantIdAndName(tenantId, entityName).getId();
case ENTITY_VIEW:
return entityViewService.findEntityViewByTenantIdAndName(tenantId, entityName).getId();
default:
return null;
}
}
public AlarmUpdateMsg convertAlarmEventToAlarmMsg(TenantId tenantId, UUID entityId, EdgeEventActionType actionType, JsonNode body) {
AlarmId alarmId = new AlarmId(entityId); AlarmId alarmId = new AlarmId(entityId);
UpdateMsgType msgType = getUpdateMsgType(actionType); UpdateMsgType msgType = getUpdateMsgType(actionType);
switch (actionType) { switch (actionType) {
@ -138,4 +122,17 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor {
} }
return null; return null;
} }
private EntityId getAlarmOriginator(TenantId tenantId, String entityName, EntityType entityType) {
switch (entityType) {
case DEVICE:
return deviceService.findDeviceByTenantIdAndName(tenantId, entityName).getId();
case ASSET:
return assetService.findAssetByTenantIdAndName(tenantId, entityName).getId();
case ENTITY_VIEW:
return entityViewService.findEntityViewByTenantIdAndName(tenantId, entityName).getId();
default:
return null;
}
}
} }

View File

@ -57,7 +57,7 @@ public class AssetEdgeProcessor extends BaseAssetProcessor {
log.trace("[{}] executing processAssetMsgFromEdge [{}] from edge [{}]", tenantId, assetUpdateMsg, edge.getName()); log.trace("[{}] executing processAssetMsgFromEdge [{}] from edge [{}]", tenantId, assetUpdateMsg, edge.getName());
AssetId assetId = new AssetId(new UUID(assetUpdateMsg.getIdMSB(), assetUpdateMsg.getIdLSB())); AssetId assetId = new AssetId(new UUID(assetUpdateMsg.getIdMSB(), assetUpdateMsg.getIdLSB()));
try { try {
edgeSynchronizationManager.getSync().set(true); edgeSynchronizationManager.getSync().set(edge.getId());
switch (assetUpdateMsg.getMsgType()) { switch (assetUpdateMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE: case ENTITY_CREATED_RPC_MESSAGE:

View File

@ -51,7 +51,7 @@ public class AssetProfileEdgeProcessor extends BaseAssetProfileProcessor {
log.trace("[{}] executing processAssetProfileMsgFromEdge [{}] from edge [{}]", tenantId, assetProfileUpdateMsg, edge.getName()); log.trace("[{}] executing processAssetProfileMsgFromEdge [{}] from edge [{}]", tenantId, assetProfileUpdateMsg, edge.getName());
AssetProfileId assetProfileId = new AssetProfileId(new UUID(assetProfileUpdateMsg.getIdMSB(), assetProfileUpdateMsg.getIdLSB())); AssetProfileId assetProfileId = new AssetProfileId(new UUID(assetProfileUpdateMsg.getIdMSB(), assetProfileUpdateMsg.getIdLSB()));
try { try {
edgeSynchronizationManager.getSync().set(true); edgeSynchronizationManager.getSync().set(edge.getId());
switch (assetProfileUpdateMsg.getMsgType()) { switch (assetProfileUpdateMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE: case ENTITY_CREATED_RPC_MESSAGE:

View File

@ -52,7 +52,7 @@ public class DashboardEdgeProcessor extends BaseDashboardProcessor {
log.trace("[{}] executing processDashboardMsgFromEdge [{}] from edge [{}]", tenantId, dashboardUpdateMsg, edge.getName()); log.trace("[{}] executing processDashboardMsgFromEdge [{}] from edge [{}]", tenantId, dashboardUpdateMsg, edge.getName());
DashboardId dashboardId = new DashboardId(new UUID(dashboardUpdateMsg.getIdMSB(), dashboardUpdateMsg.getIdLSB())); DashboardId dashboardId = new DashboardId(new UUID(dashboardUpdateMsg.getIdMSB(), dashboardUpdateMsg.getIdLSB()));
try { try {
edgeSynchronizationManager.getSync().set(true); edgeSynchronizationManager.getSync().set(edge.getId());
switch (dashboardUpdateMsg.getMsgType()) { switch (dashboardUpdateMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE: case ENTITY_CREATED_RPC_MESSAGE:

View File

@ -16,7 +16,6 @@
package org.thingsboard.server.service.edge.rpc.processor.device; package org.thingsboard.server.service.edge.rpc.processor.device;
import com.datastax.oss.driver.api.core.uuid.Uuids; import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.util.Pair; import org.springframework.data.util.Pair;
@ -104,34 +103,27 @@ public abstract class BaseDeviceProcessor extends BaseEdgeProcessor {
return Pair.of(created, deviceNameUpdated); return Pair.of(created, deviceNameUpdated);
} }
public ListenableFuture<Void> processDeviceCredentialsMsg(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) { protected void updateDeviceCredentials(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) {
log.debug("[{}] Executing processDeviceCredentialsMsg, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg);
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB())); DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB()));
return dbCallbackExecutorService.submit(() -> { Device device = deviceService.findDeviceById(tenantId, deviceId);
Device device = deviceService.findDeviceById(tenantId, deviceId); if (device != null) {
if (device != null) { log.debug("Updating device credentials for device [{}]. New device credentials Id [{}], value [{}]",
log.debug("Updating device credentials for device [{}]. New device credentials Id [{}], value [{}]", device.getName(), deviceCredentialsUpdateMsg.getCredentialsId(), deviceCredentialsUpdateMsg.getCredentialsValue());
device.getName(), deviceCredentialsUpdateMsg.getCredentialsId(), deviceCredentialsUpdateMsg.getCredentialsValue()); try {
try { DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(tenantId, device.getId());
edgeSynchronizationManager.getSync().set(true); deviceCredentials.setCredentialsType(DeviceCredentialsType.valueOf(deviceCredentialsUpdateMsg.getCredentialsType()));
deviceCredentials.setCredentialsId(deviceCredentialsUpdateMsg.getCredentialsId());
deviceCredentials.setCredentialsValue(deviceCredentialsUpdateMsg.hasCredentialsValue()
? deviceCredentialsUpdateMsg.getCredentialsValue() : null);
deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials);
DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(tenantId, device.getId()); } catch (Exception e) {
deviceCredentials.setCredentialsType(DeviceCredentialsType.valueOf(deviceCredentialsUpdateMsg.getCredentialsType())); log.error("Can't update device credentials for device [{}], deviceCredentialsUpdateMsg [{}]",
deviceCredentials.setCredentialsId(deviceCredentialsUpdateMsg.getCredentialsId()); device.getName(), deviceCredentialsUpdateMsg, e);
deviceCredentials.setCredentialsValue(deviceCredentialsUpdateMsg.hasCredentialsValue() throw new RuntimeException(e);
? deviceCredentialsUpdateMsg.getCredentialsValue() : null);
deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials);
} catch (Exception e) {
log.error("Can't update device credentials for device [{}], deviceCredentialsUpdateMsg [{}]",
device.getName(), deviceCredentialsUpdateMsg, e);
throw new RuntimeException(e);
} finally {
edgeSynchronizationManager.getSync().remove();
}
} else {
log.warn("Can't find device by id [{}], deviceCredentialsUpdateMsg [{}]", deviceId, deviceCredentialsUpdateMsg);
} }
return null; } else {
}); log.warn("Can't find device by id [{}], deviceCredentialsUpdateMsg [{}]", deviceId, deviceCredentialsUpdateMsg);
}
} }
} }

View File

@ -35,10 +35,12 @@ import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
@ -66,7 +68,7 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor {
log.trace("[{}] executing processDeviceMsgFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); log.trace("[{}] executing processDeviceMsgFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName());
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
try { try {
edgeSynchronizationManager.getSync().set(true); edgeSynchronizationManager.getSync().set(edge.getId());
switch (deviceUpdateMsg.getMsgType()) { switch (deviceUpdateMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE: case ENTITY_CREATED_RPC_MESSAGE:
@ -95,6 +97,18 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor {
} }
} }
public ListenableFuture<Void> processDeviceCredentialsMsgFromEdge(TenantId tenantId, EdgeId edgeId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) {
log.debug("[{}] Executing processDeviceCredentialsMsgFromEdge, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg);
try {
edgeSynchronizationManager.getSync().set(edgeId);
updateDeviceCredentials(tenantId, deviceCredentialsUpdateMsg);
} finally {
edgeSynchronizationManager.getSync().remove();
}
return Futures.immediateFuture(null);
}
private void saveOrUpdateDevice(TenantId tenantId, DeviceId deviceId, DeviceUpdateMsg deviceUpdateMsg, Edge edge) { private void saveOrUpdateDevice(TenantId tenantId, DeviceId deviceId, DeviceUpdateMsg deviceUpdateMsg, Edge edge) {
CustomerId customerId = safeGetCustomerId(deviceUpdateMsg.getCustomerIdMSB(), deviceUpdateMsg.getCustomerIdLSB()); CustomerId customerId = safeGetCustomerId(deviceUpdateMsg.getCustomerIdMSB(), deviceUpdateMsg.getCustomerIdLSB());
Pair<Boolean, Boolean> resultPair = super.saveOrUpdateDevice(tenantId, deviceId, deviceUpdateMsg, customerId); Pair<Boolean, Boolean> resultPair = super.saveOrUpdateDevice(tenantId, deviceId, deviceUpdateMsg, customerId);

View File

@ -52,7 +52,7 @@ public class DeviceProfileEdgeProcessor extends BaseDeviceProfileProcessor {
log.trace("[{}] executing processDeviceProfileMsgFromEdge [{}] from edge [{}]", tenantId, deviceProfileUpdateMsg, edge.getName()); log.trace("[{}] executing processDeviceProfileMsgFromEdge [{}] from edge [{}]", tenantId, deviceProfileUpdateMsg, edge.getName());
DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileUpdateMsg.getIdMSB(), deviceProfileUpdateMsg.getIdLSB())); DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileUpdateMsg.getIdMSB(), deviceProfileUpdateMsg.getIdLSB()));
try { try {
edgeSynchronizationManager.getSync().set(true); edgeSynchronizationManager.getSync().set(edge.getId());
switch (deviceProfileUpdateMsg.getMsgType()) { switch (deviceProfileUpdateMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE: case ENTITY_CREATED_RPC_MESSAGE:

View File

@ -55,7 +55,7 @@ public class EntityViewEdgeProcessor extends BaseEntityViewProcessor {
log.trace("[{}] executing processEntityViewMsgFromEdge [{}] from edge [{}]", tenantId, entityViewUpdateMsg, edge.getName()); log.trace("[{}] executing processEntityViewMsgFromEdge [{}] from edge [{}]", tenantId, entityViewUpdateMsg, edge.getName());
EntityViewId entityViewId = new EntityViewId(new UUID(entityViewUpdateMsg.getIdMSB(), entityViewUpdateMsg.getIdLSB())); EntityViewId entityViewId = new EntityViewId(new UUID(entityViewUpdateMsg.getIdMSB(), entityViewUpdateMsg.getIdLSB()));
try { try {
edgeSynchronizationManager.getSync().set(true); edgeSynchronizationManager.getSync().set(edge.getId());
switch (entityViewUpdateMsg.getMsgType()) { switch (entityViewUpdateMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE: case ENTITY_CREATED_RPC_MESSAGE:

View File

@ -34,9 +34,7 @@ import java.util.UUID;
public abstract class BaseRelationProcessor extends BaseEdgeProcessor { public abstract class BaseRelationProcessor extends BaseEdgeProcessor {
public ListenableFuture<Void> processRelationMsg(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) { public ListenableFuture<Void> processRelationMsg(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) {
log.trace("[{}] processRelationMsg [{}]", tenantId, relationUpdateMsg);
try { try {
edgeSynchronizationManager.getSync().set(true);
EntityRelation entityRelation = new EntityRelation(); EntityRelation entityRelation = new EntityRelation();
UUID fromUUID = new UUID(relationUpdateMsg.getFromIdMSB(), relationUpdateMsg.getFromIdLSB()); UUID fromUUID = new UUID(relationUpdateMsg.getFromIdMSB(), relationUpdateMsg.getFromIdLSB());
@ -72,8 +70,6 @@ public abstract class BaseRelationProcessor extends BaseEdgeProcessor {
} catch (Exception e) { } catch (Exception e) {
log.error("[{}] Failed to process relation update msg [{}]", tenantId, relationUpdateMsg, e); log.error("[{}] Failed to process relation update msg [{}]", tenantId, relationUpdateMsg, e);
return Futures.immediateFailedFuture(e); return Futures.immediateFailedFuture(e);
} finally {
edgeSynchronizationManager.getSync().remove();
} }
return Futures.immediateFuture(null); return Futures.immediateFuture(null);
} }

View File

@ -23,6 +23,7 @@ import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.edge.EdgeEventType;
@ -45,6 +46,17 @@ import java.util.Set;
@TbCoreComponent @TbCoreComponent
public class RelationEdgeProcessor extends BaseRelationProcessor { public class RelationEdgeProcessor extends BaseRelationProcessor {
public ListenableFuture<Void> processRelationMsgFromEdge(TenantId tenantId, Edge edge, RelationUpdateMsg relationUpdateMsg) {
log.trace("[{}] executing processRelationMsgFromEdge [{}] from edge [{}]", tenantId, relationUpdateMsg, edge.getName());
try {
edgeSynchronizationManager.getSync().set(edge.getId());
return processRelationMsg(tenantId, relationUpdateMsg);
} finally {
edgeSynchronizationManager.getSync().remove();
}
}
public DownlinkMsg convertRelationEventToDownlink(EdgeEvent edgeEvent) { public DownlinkMsg convertRelationEventToDownlink(EdgeEvent edgeEvent) {
EntityRelation entityRelation = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), EntityRelation.class); EntityRelation entityRelation = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), EntityRelation.class);
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());

View File

@ -475,7 +475,7 @@ public class DefaultTbClusterService implements TbClusterService {
} }
@Override @Override
public void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action) { public void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action, EdgeId sourceEdgeId) {
if (!edgesEnabled) { if (!edgesEnabled) {
return; return;
} }
@ -508,6 +508,10 @@ public class DefaultTbClusterService implements TbClusterService {
if (body != null) { if (body != null) {
builder.setBody(body); builder.setBody(body);
} }
if (sourceEdgeId != null) {
builder.setSourceEdgeIdMSB(sourceEdgeId.getId().getMostSignificantBits());
builder.setSourceEdgeIdLSB(sourceEdgeId.getId().getLeastSignificantBits());
}
TransportProtos.EdgeNotificationMsgProto msg = builder.build(); TransportProtos.EdgeNotificationMsgProto msg = builder.build();
log.trace("[{}] sending notification to edge service {}", tenantId.getId(), msg); log.trace("[{}] sending notification to edge service {}", tenantId.getId(), msg);
pushMsgToCore(tenantId, entityId != null ? entityId : tenantId, TransportProtos.ToCoreMsg.newBuilder().setEdgeNotificationMsg(msg).build(), null); pushMsgToCore(tenantId, entityId != null ? entityId : tenantId, TransportProtos.ToCoreMsg.newBuilder().setEdgeNotificationMsg(msg).build(), null);

View File

@ -91,7 +91,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest {
int cntTime = 1; int cntTime = 1;
Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId),
Mockito.isNull(), Mockito.isNull(), Mockito.any(), Mockito.eq(EdgeEventType.RELATION), Mockito.isNull(), Mockito.isNull(), Mockito.any(), Mockito.eq(EdgeEventType.RELATION),
Mockito.eq(edgeTypeByActionType(actionType))); Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any());
ArgumentMatcher<EntityId> matcherOriginatorId = argument -> argument.equals(relation.getTo()); ArgumentMatcher<EntityId> matcherOriginatorId = argument -> argument.equals(relation.getTo());
ArgumentMatcher<HasName> matcherEntityClassEquals = Objects::isNull; ArgumentMatcher<HasName> matcherEntityClassEquals = Objects::isNull;
ArgumentMatcher<CustomerId> matcherCustomerId = customerId == null ? ArgumentMatcher<CustomerId> matcherCustomerId = customerId == null ?
@ -111,7 +111,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest {
ActionType actionType, int cntTime) { ActionType actionType, int cntTime) {
Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId),
Mockito.isNull(), Mockito.isNull(), Mockito.any(), Mockito.eq(EdgeEventType.RELATION), Mockito.isNull(), Mockito.isNull(), Mockito.any(), Mockito.eq(EdgeEventType.RELATION),
Mockito.eq(edgeTypeByActionType(actionType))); Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any());
ArgumentMatcher<EntityId> matcherOriginatorId = argument -> argument.getClass().equals(relation.getFrom().getClass()); ArgumentMatcher<EntityId> matcherOriginatorId = argument -> argument.getClass().equals(relation.getFrom().getClass());
ArgumentMatcher<HasName> matcherEntityClassEquals = Objects::isNull; ArgumentMatcher<HasName> matcherEntityClassEquals = Objects::isNull;
ArgumentMatcher<CustomerId> matcherCustomerId = customerId == null ? ArgumentMatcher<CustomerId> matcherCustomerId = customerId == null ?
@ -318,13 +318,13 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest {
private void testNotificationMsgToEdgeServiceNeverWithActionType(EntityId entityId, ActionType actionType) { private void testNotificationMsgToEdgeServiceNeverWithActionType(EntityId entityId, ActionType actionType) {
EdgeEventActionType edgeEventActionType = ActionType.CREDENTIALS_UPDATED.equals(actionType) ? EdgeEventActionType edgeEventActionType = ActionType.CREDENTIALS_UPDATED.equals(actionType) ?
EdgeEventActionType.CREDENTIALS_UPDATED : edgeTypeByActionType(actionType); EdgeEventActionType.CREDENTIALS_UPDATED : edgeTypeByActionType(actionType);
Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(), Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(), Mockito.any(),
Mockito.any(), Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.eq(edgeEventActionType)); Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.eq(edgeEventActionType), Mockito.any());
} }
private void testNotificationMsgToEdgeServiceNever(EntityId entityId) { private void testNotificationMsgToEdgeServiceNever(EntityId entityId) {
Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(), Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(), Mockito.any(),
Mockito.any(), Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.any()); Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
} }
private void testLogEntityActionNever(EntityId entityId, HasName entity) { private void testLogEntityActionNever(EntityId entityId, HasName entity) {
@ -358,13 +358,13 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest {
argument -> argument.getClass().equals(entityId.getClass()); argument -> argument.getClass().equals(entityId.getClass());
Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId),
Mockito.any(), Mockito.argThat(matcherEntityId), Mockito.any(), Mockito.isNull(), Mockito.any(), Mockito.argThat(matcherEntityId), Mockito.any(), Mockito.isNull(),
Mockito.eq(edgeEventActionType)); Mockito.eq(edgeEventActionType), Mockito.any());
} }
private void testSendNotificationMsgToEdgeServiceTimeEntityEqAny(TenantId tenantId, ActionType actionType, int cntTime) { private void testSendNotificationMsgToEdgeServiceTimeEntityEqAny(TenantId tenantId, ActionType actionType, int cntTime) {
Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId),
Mockito.any(), Mockito.any(EntityId.class), Mockito.any(), Mockito.isNull(), Mockito.any(), Mockito.any(EntityId.class), Mockito.any(), Mockito.isNull(),
Mockito.eq(edgeTypeByActionType(actionType))); Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any());
} }
protected void testBroadcastEntityStateChangeEventTime(EntityId entityId, TenantId tenantId, int cntTime) { protected void testBroadcastEntityStateChangeEventTime(EntityId entityId, TenantId tenantId, int cntTime) {

View File

@ -213,7 +213,7 @@ public class AssetEdgeTest extends AbstractEdgeTest {
testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder);
edgeImitator.expectResponsesAmount(1); edgeImitator.expectResponsesAmount(1);
edgeImitator.expectMessageAmount(1); edgeImitator.expectMessageAmount(2);
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());

View File

@ -548,16 +548,16 @@ public class DeviceEdgeTest extends AbstractEdgeTest {
uplinkMsgBuilder.addDeviceUpdateMsg(deviceUpdateMsgBuilder.build()); uplinkMsgBuilder.addDeviceUpdateMsg(deviceUpdateMsgBuilder.build());
edgeImitator.expectResponsesAmount(1); edgeImitator.expectResponsesAmount(1);
edgeImitator.expectMessageAmount(1); edgeImitator.expectMessageAmount(2);
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
Assert.assertTrue(edgeImitator.waitForResponses()); Assert.assertTrue(edgeImitator.waitForResponses());
Assert.assertTrue(edgeImitator.waitForMessages()); Assert.assertTrue(edgeImitator.waitForMessages());
AbstractMessage latestMessage = edgeImitator.getLatestMessage(); Optional<DeviceCredentialsRequestMsg> deviceCredentialsRequestMsgOpt = edgeImitator.findMessageByType(DeviceCredentialsRequestMsg.class);
Assert.assertTrue(latestMessage instanceof DeviceCredentialsRequestMsg); Assert.assertTrue(deviceCredentialsRequestMsgOpt.isPresent());
DeviceCredentialsRequestMsg latestDeviceCredentialsRequestMsg = (DeviceCredentialsRequestMsg) latestMessage; DeviceCredentialsRequestMsg latestDeviceCredentialsRequestMsg = deviceCredentialsRequestMsgOpt.get();
Assert.assertEquals(uuid.getMostSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdMSB()); Assert.assertEquals(uuid.getMostSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdMSB());
Assert.assertEquals(uuid.getLeastSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB()); Assert.assertEquals(uuid.getLeastSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB());

View File

@ -555,7 +555,7 @@ public class ExportImportServiceSqlTest extends BaseExportImportServiceTest {
Customer updatedCustomer = importEntity(tenantAdmin2, updatedCustomerEntity).getSavedEntity(); Customer updatedCustomer = importEntity(tenantAdmin2, updatedCustomerEntity).getSavedEntity();
verify(entityActionService).logEntityAction(any(), eq(importedCustomer.getId()), eq(updatedCustomer), verify(entityActionService).logEntityAction(any(), eq(importedCustomer.getId()), eq(updatedCustomer),
any(), eq(ActionType.UPDATED), isNull()); any(), eq(ActionType.UPDATED), isNull());
verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedCustomer.getId()), any(), any(), eq(EdgeEventActionType.UPDATED)); verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedCustomer.getId()), any(), any(), eq(EdgeEventActionType.UPDATED), any());
Mockito.reset(entityActionService); Mockito.reset(entityActionService);
@ -572,7 +572,7 @@ public class ExportImportServiceSqlTest extends BaseExportImportServiceTest {
verify(entityActionService).logEntityAction(any(), eq(importedAssetProfile.getId()), eq(importedAssetProfile), verify(entityActionService).logEntityAction(any(), eq(importedAssetProfile.getId()), eq(importedAssetProfile),
any(), eq(ActionType.ADDED), isNull()); any(), eq(ActionType.ADDED), isNull());
verify(tbClusterService).broadcastEntityStateChangeEvent(any(), eq(importedAssetProfile.getId()), eq(ComponentLifecycleEvent.CREATED)); verify(tbClusterService).broadcastEntityStateChangeEvent(any(), eq(importedAssetProfile.getId()), eq(ComponentLifecycleEvent.CREATED));
verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedAssetProfile.getId()), any(), any(), eq(EdgeEventActionType.ADDED)); verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedAssetProfile.getId()), any(), any(), eq(EdgeEventActionType.ADDED), any());
Asset importedAsset = (Asset) importEntity(tenantAdmin2, getAndClone(entitiesExportData, EntityType.ASSET)).getSavedEntity(); Asset importedAsset = (Asset) importEntity(tenantAdmin2, getAndClone(entitiesExportData, EntityType.ASSET)).getSavedEntity();
verify(entityActionService).logEntityAction(any(), eq(importedAsset.getId()), eq(importedAsset), verify(entityActionService).logEntityAction(any(), eq(importedAsset.getId()), eq(importedAsset),
@ -588,14 +588,14 @@ public class ExportImportServiceSqlTest extends BaseExportImportServiceTest {
verify(entityActionService).logEntityAction(any(), eq(importedAsset.getId()), eq(updatedAsset), verify(entityActionService).logEntityAction(any(), eq(importedAsset.getId()), eq(updatedAsset),
any(), eq(ActionType.UPDATED), isNull()); any(), eq(ActionType.UPDATED), isNull());
verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedAsset.getId()), any(), any(), eq(EdgeEventActionType.UPDATED)); verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedAsset.getId()), any(), any(), eq(EdgeEventActionType.UPDATED), any());
DeviceProfile importedDeviceProfile = (DeviceProfile) importEntity(tenantAdmin2, getAndClone(entitiesExportData, EntityType.DEVICE_PROFILE)).getSavedEntity(); DeviceProfile importedDeviceProfile = (DeviceProfile) importEntity(tenantAdmin2, getAndClone(entitiesExportData, EntityType.DEVICE_PROFILE)).getSavedEntity();
verify(entityActionService).logEntityAction(any(), eq(importedDeviceProfile.getId()), eq(importedDeviceProfile), verify(entityActionService).logEntityAction(any(), eq(importedDeviceProfile.getId()), eq(importedDeviceProfile),
any(), eq(ActionType.ADDED), isNull()); any(), eq(ActionType.ADDED), isNull());
verify(tbClusterService).onDeviceProfileChange(eq(importedDeviceProfile), any()); verify(tbClusterService).onDeviceProfileChange(eq(importedDeviceProfile), any());
verify(tbClusterService).broadcastEntityStateChangeEvent(any(), eq(importedDeviceProfile.getId()), eq(ComponentLifecycleEvent.CREATED)); verify(tbClusterService).broadcastEntityStateChangeEvent(any(), eq(importedDeviceProfile.getId()), eq(ComponentLifecycleEvent.CREATED));
verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedDeviceProfile.getId()), any(), any(), eq(EdgeEventActionType.ADDED)); verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedDeviceProfile.getId()), any(), any(), eq(EdgeEventActionType.ADDED), any());
verify(otaPackageStateService).update(eq(importedDeviceProfile), eq(false), eq(false)); verify(otaPackageStateService).update(eq(importedDeviceProfile), eq(false), eq(false));
Device importedDevice = (Device) importEntity(tenantAdmin2, getAndClone(entitiesExportData, EntityType.DEVICE)).getSavedEntity(); Device importedDevice = (Device) importEntity(tenantAdmin2, getAndClone(entitiesExportData, EntityType.DEVICE)).getSavedEntity();

View File

@ -92,6 +92,6 @@ public interface TbClusterService extends TbQueueClusterService {
void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse); void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse);
void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action); void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action, EdgeId sourceEdgeId);
} }

View File

@ -744,6 +744,8 @@ message EdgeNotificationMsgProto {
string body = 10; string body = 10;
PostTelemetryMsg postTelemetryMsg = 11; PostTelemetryMsg postTelemetryMsg = 11;
PostAttributeMsg postAttributesMsg = 12; PostAttributeMsg postAttributesMsg = 12;
int64 sourceEdgeIdMSB = 13;
int64 sourceEdgeIdLSB = 14;
} }
/** /**

View File

@ -15,9 +15,11 @@
*/ */
package org.thingsboard.server.dao.edge; package org.thingsboard.server.dao.edge;
import org.thingsboard.server.common.data.id.EdgeId;
public interface EdgeSynchronizationManager { public interface EdgeSynchronizationManager {
ThreadLocal<Boolean> getSync(); ThreadLocal<EdgeId> getSync();
boolean isSync(); EdgeId getEdgeId();
} }

View File

@ -18,17 +18,17 @@ package org.thingsboard.server.dao.edge;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.EdgeId;
@Component @Component
@Slf4j @Slf4j
public class DefaultEdgeSynchronizationManager implements EdgeSynchronizationManager { public class DefaultEdgeSynchronizationManager implements EdgeSynchronizationManager {
@Getter @Getter
private final ThreadLocal<Boolean> sync = new ThreadLocal<>(); private final ThreadLocal<EdgeId> sync = new ThreadLocal<>();
@Override @Override
public boolean isSync() { public EdgeId getEdgeId() {
Boolean sync = this.sync.get(); return this.sync.get();
return sync != null && sync;
} }
} }

View File

@ -26,6 +26,5 @@ import org.thingsboard.server.common.data.id.TenantId;
public class DeleteEntityEvent<T> { public class DeleteEntityEvent<T> {
private final TenantId tenantId; private final TenantId tenantId;
private final EntityId entityId; private final EntityId entityId;
private final EdgeId edgeId;
private final T entity; private final T entity;
} }