Merge branch 'feature/edge-sync-improvement' into feature/edge-resource-support
This commit is contained in:
commit
1378cafbef
@ -45,7 +45,6 @@ import javax.annotation.PostConstruct;
|
|||||||
|
|
||||||
import static org.thingsboard.server.service.entitiy.DefaultTbNotificationEntityService.edgeTypeByActionType;
|
import static org.thingsboard.server.service.entitiy.DefaultTbNotificationEntityService.edgeTypeByActionType;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This event listener does not support async event processing because relay on ThreadLocal
|
* This event listener does not support async event processing because relay on ThreadLocal
|
||||||
* Another possible approach is to implement a special annotation and a bunch of classes similar to TransactionalApplicationListener
|
* Another possible approach is to implement a special annotation and a bunch of classes similar to TransactionalApplicationListener
|
||||||
@ -75,9 +74,6 @@ public class EdgeEventSourcingListener {
|
|||||||
|
|
||||||
@TransactionalEventListener(fallbackExecution = true)
|
@TransactionalEventListener(fallbackExecution = true)
|
||||||
public void handleEvent(SaveEntityEvent<?> event) {
|
public void handleEvent(SaveEntityEvent<?> event) {
|
||||||
if (edgeSynchronizationManager.isSync()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
if (!isValidSaveEntityEventForEdgeProcessing(event.getEntity(), event.getOldEntity())) {
|
if (!isValidSaveEntityEventForEdgeProcessing(event.getEntity(), event.getOldEntity())) {
|
||||||
return;
|
return;
|
||||||
@ -85,7 +81,7 @@ public class EdgeEventSourcingListener {
|
|||||||
log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event);
|
log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event);
|
||||||
EdgeEventActionType action = Boolean.TRUE.equals(event.getAdded()) ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED;
|
EdgeEventActionType action = Boolean.TRUE.equals(event.getAdded()) ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED;
|
||||||
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(),
|
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(),
|
||||||
null, null, action);
|
null, null, action, edgeSynchronizationManager.getEdgeId().get());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event, e);
|
log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event, e);
|
||||||
}
|
}
|
||||||
@ -93,13 +89,11 @@ public class EdgeEventSourcingListener {
|
|||||||
|
|
||||||
@TransactionalEventListener(fallbackExecution = true)
|
@TransactionalEventListener(fallbackExecution = true)
|
||||||
public void handleEvent(DeleteEntityEvent<?> event) {
|
public void handleEvent(DeleteEntityEvent<?> event) {
|
||||||
if (edgeSynchronizationManager.isSync()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
log.trace("[{}] DeleteEntityEvent called: {}", event.getTenantId(), event);
|
log.trace("[{}] DeleteEntityEvent called: {}", event.getTenantId(), event);
|
||||||
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(),
|
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(),
|
||||||
JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED);
|
JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED,
|
||||||
|
edgeSynchronizationManager.getEdgeId().get());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event, e);
|
log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event, e);
|
||||||
}
|
}
|
||||||
@ -107,13 +101,11 @@ public class EdgeEventSourcingListener {
|
|||||||
|
|
||||||
@TransactionalEventListener(fallbackExecution = true)
|
@TransactionalEventListener(fallbackExecution = true)
|
||||||
public void handleEvent(ActionEntityEvent event) {
|
public void handleEvent(ActionEntityEvent event) {
|
||||||
if (edgeSynchronizationManager.isSync()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
log.trace("[{}] ActionEntityEvent called: {}", event.getTenantId(), event);
|
log.trace("[{}] ActionEntityEvent called: {}", event.getTenantId(), event);
|
||||||
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(),
|
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(),
|
||||||
event.getBody(), null, edgeTypeByActionType(event.getActionType()));
|
event.getBody(), null, edgeTypeByActionType(event.getActionType()),
|
||||||
|
edgeSynchronizationManager.getEdgeId().get());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event, e);
|
log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event, e);
|
||||||
}
|
}
|
||||||
@ -121,9 +113,6 @@ public class EdgeEventSourcingListener {
|
|||||||
|
|
||||||
@TransactionalEventListener(fallbackExecution = true)
|
@TransactionalEventListener(fallbackExecution = true)
|
||||||
public void handleEvent(RelationActionEvent event) {
|
public void handleEvent(RelationActionEvent event) {
|
||||||
if (edgeSynchronizationManager.isSync()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
EntityRelation relation = event.getRelation();
|
EntityRelation relation = event.getRelation();
|
||||||
if (relation == null) {
|
if (relation == null) {
|
||||||
@ -136,7 +125,8 @@ public class EdgeEventSourcingListener {
|
|||||||
}
|
}
|
||||||
log.trace("[{}] RelationActionEvent called: {}", event.getTenantId(), event);
|
log.trace("[{}] RelationActionEvent called: {}", event.getTenantId(), event);
|
||||||
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, null,
|
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, null,
|
||||||
JacksonUtil.toString(relation), EdgeEventType.RELATION, edgeTypeByActionType(event.getActionType()));
|
JacksonUtil.toString(relation), EdgeEventType.RELATION, edgeTypeByActionType(event.getActionType()),
|
||||||
|
edgeSynchronizationManager.getEdgeId().get());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event, e);
|
log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event, e);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -680,7 +680,7 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
}
|
}
|
||||||
if (uplinkMsg.getDeviceCredentialsUpdateMsgCount() > 0) {
|
if (uplinkMsg.getDeviceCredentialsUpdateMsgCount() > 0) {
|
||||||
for (DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg : uplinkMsg.getDeviceCredentialsUpdateMsgList()) {
|
for (DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg : uplinkMsg.getDeviceCredentialsUpdateMsgList()) {
|
||||||
result.add(ctx.getDeviceProcessor().processDeviceCredentialsMsg(edge.getTenantId(), deviceCredentialsUpdateMsg));
|
result.add(ctx.getDeviceProcessor().processDeviceCredentialsMsgFromEdge(edge.getTenantId(), edge.getId(), deviceCredentialsUpdateMsg));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (uplinkMsg.getAssetProfileUpdateMsgCount() > 0) {
|
if (uplinkMsg.getAssetProfileUpdateMsgCount() > 0) {
|
||||||
@ -695,7 +695,7 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
}
|
}
|
||||||
if (uplinkMsg.getAlarmUpdateMsgCount() > 0) {
|
if (uplinkMsg.getAlarmUpdateMsgCount() > 0) {
|
||||||
for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdateMsgList()) {
|
for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdateMsgList()) {
|
||||||
result.add(ctx.getAlarmProcessor().processAlarmMsg(edge.getTenantId(), alarmUpdateMsg));
|
result.add(ctx.getAlarmProcessor().processAlarmMsgFromEdge(edge.getTenantId(), edge.getId(), alarmUpdateMsg));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (uplinkMsg.getEntityViewUpdateMsgCount() > 0) {
|
if (uplinkMsg.getEntityViewUpdateMsgCount() > 0) {
|
||||||
@ -705,7 +705,7 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
}
|
}
|
||||||
if (uplinkMsg.getRelationUpdateMsgCount() > 0) {
|
if (uplinkMsg.getRelationUpdateMsgCount() > 0) {
|
||||||
for (RelationUpdateMsg relationUpdateMsg : uplinkMsg.getRelationUpdateMsgList()) {
|
for (RelationUpdateMsg relationUpdateMsg : uplinkMsg.getRelationUpdateMsgList()) {
|
||||||
result.add(ctx.getRelationProcessor().processRelationMsg(edge.getTenantId(), relationUpdateMsg));
|
result.add(ctx.getRelationProcessor().processRelationMsgFromEdge(edge.getTenantId(), edge, relationUpdateMsg));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (uplinkMsg.getDashboardUpdateMsgCount() > 0) {
|
if (uplinkMsg.getDashboardUpdateMsgCount() > 0) {
|
||||||
|
|||||||
@ -15,7 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.edge.rpc.constructor.rule;
|
package org.thingsboard.server.service.edge.rpc.constructor.rule;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
||||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -50,7 +49,7 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM
|
|||||||
constructRuleChainMetadataUpdatedMsg(tenantId, builder, ruleChainMetaData);
|
constructRuleChainMetadataUpdatedMsg(tenantId, builder, ruleChainMetaData);
|
||||||
builder.setMsgType(msgType);
|
builder.setMsgType(msgType);
|
||||||
return builder.build();
|
return builder.build();
|
||||||
} catch (JsonProcessingException ex) {
|
} catch (Exception ex) {
|
||||||
log.error("[{}] Can't construct RuleChainMetadataUpdateMsg", tenantId, ex);
|
log.error("[{}] Can't construct RuleChainMetadataUpdateMsg", tenantId, ex);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
@ -58,7 +57,7 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM
|
|||||||
|
|
||||||
protected abstract void constructRuleChainMetadataUpdatedMsg(TenantId tenantId,
|
protected abstract void constructRuleChainMetadataUpdatedMsg(TenantId tenantId,
|
||||||
RuleChainMetadataUpdateMsg.Builder builder,
|
RuleChainMetadataUpdateMsg.Builder builder,
|
||||||
RuleChainMetaData ruleChainMetaData) throws JsonProcessingException;
|
RuleChainMetaData ruleChainMetaData);
|
||||||
|
|
||||||
protected List<NodeConnectionInfoProto> constructConnections(List<NodeConnectionInfo> connections) {
|
protected List<NodeConnectionInfoProto> constructConnections(List<NodeConnectionInfo> connections) {
|
||||||
List<NodeConnectionInfoProto> result = new ArrayList<>();
|
List<NodeConnectionInfoProto> result = new ArrayList<>();
|
||||||
@ -78,7 +77,7 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<RuleNodeProto> constructNodes(List<RuleNode> nodes) throws JsonProcessingException {
|
protected List<RuleNodeProto> constructNodes(List<RuleNode> nodes) {
|
||||||
List<RuleNodeProto> result = new ArrayList<>();
|
List<RuleNodeProto> result = new ArrayList<>();
|
||||||
if (nodes != null && !nodes.isEmpty()) {
|
if (nodes != null && !nodes.isEmpty()) {
|
||||||
for (RuleNode node : nodes) {
|
for (RuleNode node : nodes) {
|
||||||
@ -88,22 +87,22 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private RuleNodeProto constructNode(RuleNode node) throws JsonProcessingException {
|
private RuleNodeProto constructNode(RuleNode node) {
|
||||||
return RuleNodeProto.newBuilder()
|
return RuleNodeProto.newBuilder()
|
||||||
.setIdMSB(node.getId().getId().getMostSignificantBits())
|
.setIdMSB(node.getId().getId().getMostSignificantBits())
|
||||||
.setIdLSB(node.getId().getId().getLeastSignificantBits())
|
.setIdLSB(node.getId().getId().getLeastSignificantBits())
|
||||||
.setType(node.getType())
|
.setType(node.getType())
|
||||||
.setName(node.getName())
|
.setName(node.getName())
|
||||||
.setDebugMode(node.isDebugMode())
|
.setDebugMode(node.isDebugMode())
|
||||||
.setConfiguration(JacksonUtil.OBJECT_MAPPER.writeValueAsString(node.getConfiguration()))
|
.setConfiguration(JacksonUtil.toString(node.getConfiguration()))
|
||||||
.setAdditionalInfo(JacksonUtil.OBJECT_MAPPER.writeValueAsString(node.getAdditionalInfo()))
|
.setAdditionalInfo(JacksonUtil.toString(node.getAdditionalInfo()))
|
||||||
.setSingletonMode(node.isSingletonMode())
|
.setSingletonMode(node.isSingletonMode())
|
||||||
.setConfigurationVersion(node.getConfigurationVersion())
|
.setConfigurationVersion(node.getConfigurationVersion())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<RuleChainConnectionInfoProto> constructRuleChainConnections(List<RuleChainConnectionInfo> ruleChainConnections,
|
protected List<RuleChainConnectionInfoProto> constructRuleChainConnections(List<RuleChainConnectionInfo> ruleChainConnections,
|
||||||
NavigableSet<Integer> removedNodeIndexes) throws JsonProcessingException {
|
NavigableSet<Integer> removedNodeIndexes) {
|
||||||
List<RuleChainConnectionInfoProto> result = new ArrayList<>();
|
List<RuleChainConnectionInfoProto> result = new ArrayList<>();
|
||||||
if (ruleChainConnections != null && !ruleChainConnections.isEmpty()) {
|
if (ruleChainConnections != null && !ruleChainConnections.isEmpty()) {
|
||||||
for (RuleChainConnectionInfo ruleChainConnectionInfo : ruleChainConnections) {
|
for (RuleChainConnectionInfo ruleChainConnectionInfo : ruleChainConnections) {
|
||||||
@ -127,13 +126,13 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private RuleChainConnectionInfoProto constructRuleChainConnection(RuleChainConnectionInfo ruleChainConnectionInfo) throws JsonProcessingException {
|
private RuleChainConnectionInfoProto constructRuleChainConnection(RuleChainConnectionInfo ruleChainConnectionInfo) {
|
||||||
return RuleChainConnectionInfoProto.newBuilder()
|
return RuleChainConnectionInfoProto.newBuilder()
|
||||||
.setFromIndex(ruleChainConnectionInfo.getFromIndex())
|
.setFromIndex(ruleChainConnectionInfo.getFromIndex())
|
||||||
.setTargetRuleChainIdMSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getMostSignificantBits())
|
.setTargetRuleChainIdMSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getMostSignificantBits())
|
||||||
.setTargetRuleChainIdLSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getLeastSignificantBits())
|
.setTargetRuleChainIdLSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getLeastSignificantBits())
|
||||||
.setType(ruleChainConnectionInfo.getType())
|
.setType(ruleChainConnectionInfo.getType())
|
||||||
.setAdditionalInfo(JacksonUtil.OBJECT_MAPPER.writeValueAsString(ruleChainConnectionInfo.getAdditionalInfo()))
|
.setAdditionalInfo(JacksonUtil.toString(ruleChainConnectionInfo.getAdditionalInfo()))
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,7 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.edge.rpc.constructor.rule;
|
package org.thingsboard.server.service.edge.rpc.constructor.rule;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.rule.engine.flow.TbRuleChainInputNode;
|
import org.thingsboard.rule.engine.flow.TbRuleChainInputNode;
|
||||||
@ -45,7 +44,7 @@ public class RuleChainMetadataConstructorV330 extends AbstractRuleChainMetadataC
|
|||||||
@Override
|
@Override
|
||||||
protected void constructRuleChainMetadataUpdatedMsg(TenantId tenantId,
|
protected void constructRuleChainMetadataUpdatedMsg(TenantId tenantId,
|
||||||
RuleChainMetadataUpdateMsg.Builder builder,
|
RuleChainMetadataUpdateMsg.Builder builder,
|
||||||
RuleChainMetaData ruleChainMetaData) throws JsonProcessingException {
|
RuleChainMetaData ruleChainMetaData) {
|
||||||
List<RuleNode> supportedNodes = filterNodes(ruleChainMetaData.getNodes());
|
List<RuleNode> supportedNodes = filterNodes(ruleChainMetaData.getNodes());
|
||||||
|
|
||||||
NavigableSet<Integer> removedNodeIndexes = getRemovedNodeIndexes(ruleChainMetaData.getNodes(), ruleChainMetaData.getConnections());
|
NavigableSet<Integer> removedNodeIndexes = getRemovedNodeIndexes(ruleChainMetaData.getNodes(), ruleChainMetaData.getConnections());
|
||||||
|
|||||||
@ -15,7 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.edge.rpc.constructor.rule;
|
package org.thingsboard.server.service.edge.rpc.constructor.rule;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
|
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
|
||||||
@ -29,7 +28,7 @@ public class RuleChainMetadataConstructorV340 extends AbstractRuleChainMetadataC
|
|||||||
@Override
|
@Override
|
||||||
protected void constructRuleChainMetadataUpdatedMsg(TenantId tenantId,
|
protected void constructRuleChainMetadataUpdatedMsg(TenantId tenantId,
|
||||||
RuleChainMetadataUpdateMsg.Builder builder,
|
RuleChainMetadataUpdateMsg.Builder builder,
|
||||||
RuleChainMetaData ruleChainMetaData) throws JsonProcessingException {
|
RuleChainMetaData ruleChainMetaData) {
|
||||||
builder.addAllNodes(constructNodes(ruleChainMetaData.getNodes()))
|
builder.addAllNodes(constructNodes(ruleChainMetaData.getNodes()))
|
||||||
.addAllConnections(constructConnections(ruleChainMetaData.getConnections()))
|
.addAllConnections(constructConnections(ruleChainMetaData.getConnections()))
|
||||||
.addAllRuleChainConnections(constructRuleChainConnections(ruleChainMetaData.getRuleChainConnections(), new TreeSet<>()));
|
.addAllRuleChainConnections(constructRuleChainConnections(ruleChainMetaData.getRuleChainConnections(), new TreeSet<>()));
|
||||||
|
|||||||
@ -79,19 +79,19 @@ public class AdminSettingsEdgeEventFetcher implements EdgeEventFetcher {
|
|||||||
|
|
||||||
AdminSettings systemMailSettings = adminSettingsService.findAdminSettingsByKey(TenantId.SYS_TENANT_ID, "mail");
|
AdminSettings systemMailSettings = adminSettingsService.findAdminSettingsByKey(TenantId.SYS_TENANT_ID, "mail");
|
||||||
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
||||||
EdgeEventActionType.UPDATED, null, JacksonUtil.OBJECT_MAPPER.valueToTree(systemMailSettings)));
|
EdgeEventActionType.UPDATED, null, JacksonUtil.valueToTree(systemMailSettings)));
|
||||||
|
|
||||||
AdminSettings tenantMailSettings = convertToTenantAdminSettings(tenantId, systemMailSettings.getKey(), (ObjectNode) systemMailSettings.getJsonValue());
|
AdminSettings tenantMailSettings = convertToTenantAdminSettings(tenantId, systemMailSettings.getKey(), (ObjectNode) systemMailSettings.getJsonValue());
|
||||||
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
||||||
EdgeEventActionType.UPDATED, null, JacksonUtil.OBJECT_MAPPER.valueToTree(tenantMailSettings)));
|
EdgeEventActionType.UPDATED, null, JacksonUtil.valueToTree(tenantMailSettings)));
|
||||||
|
|
||||||
AdminSettings systemMailTemplates = loadMailTemplates(tenantId);
|
AdminSettings systemMailTemplates = loadMailTemplates(tenantId);
|
||||||
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
||||||
EdgeEventActionType.UPDATED, null, JacksonUtil.OBJECT_MAPPER.valueToTree(systemMailTemplates)));
|
EdgeEventActionType.UPDATED, null, JacksonUtil.valueToTree(systemMailTemplates)));
|
||||||
|
|
||||||
AdminSettings tenantMailTemplates = convertToTenantAdminSettings(tenantId, systemMailTemplates.getKey(), (ObjectNode) systemMailTemplates.getJsonValue());
|
AdminSettings tenantMailTemplates = convertToTenantAdminSettings(tenantId, systemMailTemplates.getKey(), (ObjectNode) systemMailTemplates.getJsonValue());
|
||||||
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
||||||
EdgeEventActionType.UPDATED, null, JacksonUtil.OBJECT_MAPPER.valueToTree(tenantMailTemplates)));
|
EdgeEventActionType.UPDATED, null, JacksonUtil.valueToTree(tenantMailTemplates)));
|
||||||
|
|
||||||
// return PageData object to be in sync with other fetchers
|
// return PageData object to be in sync with other fetchers
|
||||||
return new PageData<>(result, 1, result.size(), false);
|
return new PageData<>(result, 1, result.size(), false);
|
||||||
@ -114,7 +114,7 @@ public class AdminSettingsEdgeEventFetcher implements EdgeEventFetcher {
|
|||||||
AdminSettings adminSettings = new AdminSettings();
|
AdminSettings adminSettings = new AdminSettings();
|
||||||
adminSettings.setId(new AdminSettingsId(Uuids.timeBased()));
|
adminSettings.setId(new AdminSettingsId(Uuids.timeBased()));
|
||||||
adminSettings.setKey("mailTemplates");
|
adminSettings.setKey("mailTemplates");
|
||||||
adminSettings.setJsonValue(JacksonUtil.OBJECT_MAPPER.convertValue(mailTemplates, JsonNode.class));
|
adminSettings.setJsonValue(JacksonUtil.convertValue(mailTemplates, JsonNode.class));
|
||||||
return adminSettings;
|
return adminSettings;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -388,7 +388,9 @@ public abstract class BaseEdgeProcessor {
|
|||||||
}, dbCallbackExecutorService);
|
}, dbCallbackExecutorService);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ListenableFuture<Void> processActionForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId) {
|
protected ListenableFuture<Void> processActionForAllEdges(TenantId tenantId, EdgeEventType type,
|
||||||
|
EdgeEventActionType actionType, EntityId entityId,
|
||||||
|
EdgeId sourceEdgeId) {
|
||||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||||
if (TenantId.SYS_TENANT_ID.equals(tenantId)) {
|
if (TenantId.SYS_TENANT_ID.equals(tenantId)) {
|
||||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
||||||
@ -396,21 +398,22 @@ public abstract class BaseEdgeProcessor {
|
|||||||
do {
|
do {
|
||||||
tenantsIds = tenantService.findTenantsIds(pageLink);
|
tenantsIds = tenantService.findTenantsIds(pageLink);
|
||||||
for (TenantId tenantId1 : tenantsIds.getData()) {
|
for (TenantId tenantId1 : tenantsIds.getData()) {
|
||||||
futures.addAll(processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, null));
|
futures.addAll(processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, null, sourceEdgeId));
|
||||||
}
|
}
|
||||||
pageLink = pageLink.nextPageLink();
|
pageLink = pageLink.nextPageLink();
|
||||||
} while (tenantsIds.hasNext());
|
} while (tenantsIds.hasNext());
|
||||||
} else {
|
} else {
|
||||||
futures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null);
|
futures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null, sourceEdgeId);
|
||||||
}
|
}
|
||||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<ListenableFuture<Void>> processActionForAllEdgesByTenantId(TenantId tenantId,
|
private List<ListenableFuture<Void>> processActionForAllEdgesByTenantId(TenantId tenantId,
|
||||||
EdgeEventType type,
|
EdgeEventType type,
|
||||||
EdgeEventActionType actionType,
|
EdgeEventActionType actionType,
|
||||||
EntityId entityId,
|
EntityId entityId,
|
||||||
JsonNode body) {
|
JsonNode body,
|
||||||
|
EdgeId sourceEdgeId) {
|
||||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
||||||
PageData<Edge> pageData;
|
PageData<Edge> pageData;
|
||||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||||
@ -418,7 +421,9 @@ public abstract class BaseEdgeProcessor {
|
|||||||
pageData = edgeService.findEdgesByTenantId(tenantId, pageLink);
|
pageData = edgeService.findEdgesByTenantId(tenantId, pageLink);
|
||||||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
||||||
for (Edge edge : pageData.getData()) {
|
for (Edge edge : pageData.getData()) {
|
||||||
futures.add(saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, body));
|
if (!edge.getId().equals(sourceEdgeId)) {
|
||||||
|
futures.add(saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, body));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (pageData.hasNext()) {
|
if (pageData.hasNext()) {
|
||||||
pageLink = pageLink.nextPageLink();
|
pageLink = pageLink.nextPageLink();
|
||||||
@ -462,11 +467,12 @@ public abstract class BaseEdgeProcessor {
|
|||||||
EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType());
|
EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType());
|
||||||
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
|
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
|
||||||
EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(type, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
|
EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(type, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
|
||||||
|
EdgeId sourceEdgeId = safeGetEdgeId(edgeNotificationMsg.getSourceEdgeIdMSB(), edgeNotificationMsg.getSourceEdgeIdLSB());
|
||||||
if (type.isAllEdgesRelated()) {
|
if (type.isAllEdgesRelated()) {
|
||||||
return processEntityNotificationForAllEdges(tenantId, type, actionType, entityId);
|
return processEntityNotificationForAllEdges(tenantId, type, actionType, entityId, sourceEdgeId);
|
||||||
} else {
|
} else {
|
||||||
JsonNode body = JacksonUtil.toJsonNode(edgeNotificationMsg.getBody());
|
JsonNode body = JacksonUtil.toJsonNode(edgeNotificationMsg.getBody());
|
||||||
EdgeId edgeId = safeGetEdgeId(edgeNotificationMsg);
|
EdgeId edgeId = safeGetEdgeId(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB());
|
||||||
switch (actionType) {
|
switch (actionType) {
|
||||||
case UPDATED:
|
case UPDATED:
|
||||||
case CREDENTIALS_UPDATED:
|
case CREDENTIALS_UPDATED:
|
||||||
@ -475,41 +481,46 @@ public abstract class BaseEdgeProcessor {
|
|||||||
if (edgeId != null) {
|
if (edgeId != null) {
|
||||||
return saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body);
|
return saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body);
|
||||||
} else {
|
} else {
|
||||||
return processNotificationToRelatedEdges(tenantId, entityId, type, actionType);
|
return processNotificationToRelatedEdges(tenantId, entityId, type, actionType, sourceEdgeId);
|
||||||
}
|
}
|
||||||
case DELETED:
|
case DELETED:
|
||||||
EdgeEventActionType deleted = EdgeEventActionType.DELETED;
|
EdgeEventActionType deleted = EdgeEventActionType.DELETED;
|
||||||
if (edgeId != null) {
|
if (edgeId != null) {
|
||||||
return saveEdgeEvent(tenantId, edgeId, type, deleted, entityId, body);
|
return saveEdgeEvent(tenantId, edgeId, type, deleted, entityId, body);
|
||||||
} else {
|
} else {
|
||||||
return Futures.transform(Futures.allAsList(processActionForAllEdgesByTenantId(tenantId, type, deleted, entityId, body)),
|
return Futures.transform(Futures.allAsList(processActionForAllEdgesByTenantId(tenantId, type, deleted, entityId, body, sourceEdgeId)),
|
||||||
voids -> null, dbCallbackExecutorService);
|
voids -> null, dbCallbackExecutorService);
|
||||||
}
|
}
|
||||||
case ASSIGNED_TO_EDGE:
|
case ASSIGNED_TO_EDGE:
|
||||||
case UNASSIGNED_FROM_EDGE:
|
case UNASSIGNED_FROM_EDGE:
|
||||||
ListenableFuture<Void> future = saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body);
|
if (sourceEdgeId == null) {
|
||||||
return Futures.transformAsync(future, unused -> {
|
ListenableFuture<Void> future = saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body);
|
||||||
if (type.equals(EdgeEventType.RULE_CHAIN)) {
|
return Futures.transformAsync(future, unused -> {
|
||||||
return updateDependentRuleChains(tenantId, new RuleChainId(entityId.getId()), edgeId);
|
if (type.equals(EdgeEventType.RULE_CHAIN)) {
|
||||||
} else {
|
return updateDependentRuleChains(tenantId, new RuleChainId(entityId.getId()), edgeId);
|
||||||
return Futures.immediateFuture(null);
|
} else {
|
||||||
}
|
return Futures.immediateFuture(null);
|
||||||
}, dbCallbackExecutorService);
|
}
|
||||||
|
}, dbCallbackExecutorService);
|
||||||
|
} else {
|
||||||
|
return Futures.immediateFuture(null);
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private EdgeId safeGetEdgeId(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
protected EdgeId safeGetEdgeId(long edgeIdMSB, long edgeIdLSB) {
|
||||||
if (edgeNotificationMsg.getEdgeIdMSB() != 0 && edgeNotificationMsg.getEdgeIdLSB() != 0) {
|
if (edgeIdMSB != 0 && edgeIdLSB != 0) {
|
||||||
return new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB()));
|
return new EdgeId(new UUID(edgeIdMSB, edgeIdLSB));
|
||||||
} else {
|
} else {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<Void> processNotificationToRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type, EdgeEventActionType actionType) {
|
private ListenableFuture<Void> processNotificationToRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type,
|
||||||
|
EdgeEventActionType actionType, EdgeId sourceEdgeId) {
|
||||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
||||||
PageData<EdgeId> pageData;
|
PageData<EdgeId> pageData;
|
||||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||||
@ -517,7 +528,9 @@ public abstract class BaseEdgeProcessor {
|
|||||||
pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink);
|
pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink);
|
||||||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
||||||
for (EdgeId relatedEdgeId : pageData.getData()) {
|
for (EdgeId relatedEdgeId : pageData.getData()) {
|
||||||
futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null));
|
if (!relatedEdgeId.equals(sourceEdgeId)) {
|
||||||
|
futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (pageData.hasNext()) {
|
if (pageData.hasNext()) {
|
||||||
pageLink = pageLink.nextPageLink();
|
pageLink = pageLink.nextPageLink();
|
||||||
@ -560,13 +573,13 @@ public abstract class BaseEdgeProcessor {
|
|||||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<Void> processEntityNotificationForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId) {
|
private ListenableFuture<Void> processEntityNotificationForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, EdgeId sourceEdgeId) {
|
||||||
switch (actionType) {
|
switch (actionType) {
|
||||||
case ADDED:
|
case ADDED:
|
||||||
case UPDATED:
|
case UPDATED:
|
||||||
case DELETED:
|
case DELETED:
|
||||||
case CREDENTIALS_UPDATED: // used by USER entity
|
case CREDENTIALS_UPDATED: // used by USER entity
|
||||||
return processActionForAllEdges(tenantId, type, actionType, entityId);
|
return processActionForAllEdges(tenantId, type, actionType, entityId, sourceEdgeId);
|
||||||
default:
|
default:
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,7 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.edge.rpc.processor.alarm;
|
package org.thingsboard.server.service.edge.rpc.processor.alarm;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
@ -47,6 +46,16 @@ import java.util.UUID;
|
|||||||
@TbCoreComponent
|
@TbCoreComponent
|
||||||
public class AlarmEdgeProcessor extends BaseAlarmProcessor {
|
public class AlarmEdgeProcessor extends BaseAlarmProcessor {
|
||||||
|
|
||||||
|
public ListenableFuture<Void> processAlarmMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmUpdateMsg alarmUpdateMsg) {
|
||||||
|
log.trace("[{}] processAlarmMsgFromEdge [{}]", tenantId, alarmUpdateMsg);
|
||||||
|
try {
|
||||||
|
edgeSynchronizationManager.getEdgeId().set(edgeId);
|
||||||
|
return processAlarmMsg(tenantId, alarmUpdateMsg);
|
||||||
|
} finally {
|
||||||
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public DownlinkMsg convertAlarmEventToDownlink(EdgeEvent edgeEvent) {
|
public DownlinkMsg convertAlarmEventToDownlink(EdgeEvent edgeEvent) {
|
||||||
AlarmUpdateMsg alarmUpdateMsg =
|
AlarmUpdateMsg alarmUpdateMsg =
|
||||||
convertAlarmEventToAlarmMsg(edgeEvent.getTenantId(), edgeEvent.getEntityId(), edgeEvent.getAction(), edgeEvent.getBody());
|
convertAlarmEventToAlarmMsg(edgeEvent.getTenantId(), edgeEvent.getEntityId(), edgeEvent.getAction(), edgeEvent.getBody());
|
||||||
@ -59,13 +68,18 @@ public class AlarmEdgeProcessor extends BaseAlarmProcessor {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ListenableFuture<Void> processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException {
|
public ListenableFuture<Void> processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
||||||
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
|
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
|
||||||
AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
|
AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
|
||||||
|
EdgeId sourceEdgeId = safeGetEdgeId(edgeNotificationMsg.getSourceEdgeIdMSB(), edgeNotificationMsg.getSourceEdgeIdLSB());
|
||||||
switch (actionType) {
|
switch (actionType) {
|
||||||
case DELETED:
|
case DELETED:
|
||||||
Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), Alarm.class);
|
Alarm deletedAlarm = JacksonUtil.fromString(edgeNotificationMsg.getBody(), Alarm.class);
|
||||||
List<ListenableFuture<Void>> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(), alarmId, actionType, JacksonUtil.OBJECT_MAPPER.valueToTree(deletedAlarm));
|
if (deletedAlarm == null) {
|
||||||
|
return Futures.immediateFuture(null);
|
||||||
|
}
|
||||||
|
List<ListenableFuture<Void>> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(),
|
||||||
|
alarmId, actionType, JacksonUtil.valueToTree(deletedAlarm), sourceEdgeId);
|
||||||
return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService);
|
return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService);
|
||||||
default:
|
default:
|
||||||
ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId);
|
ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId);
|
||||||
@ -77,13 +91,14 @@ public class AlarmEdgeProcessor extends BaseAlarmProcessor {
|
|||||||
if (type == null) {
|
if (type == null) {
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
List<ListenableFuture<Void>> futures = pushEventToAllRelatedEdges(tenantId, alarm.getOriginator(), alarmId, actionType, null);
|
List<ListenableFuture<Void>> futures = pushEventToAllRelatedEdges(tenantId, alarm.getOriginator(),
|
||||||
|
alarmId, actionType, null, sourceEdgeId);
|
||||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||||
}, dbCallbackExecutorService);
|
}, dbCallbackExecutorService);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<ListenableFuture<Void>> pushEventToAllRelatedEdges(TenantId tenantId, EntityId originatorId, AlarmId alarmId, EdgeEventActionType actionType, JsonNode body) {
|
private List<ListenableFuture<Void>> pushEventToAllRelatedEdges(TenantId tenantId, EntityId originatorId, AlarmId alarmId, EdgeEventActionType actionType, JsonNode body, EdgeId sourceEdgeId) {
|
||||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
||||||
PageData<EdgeId> pageData;
|
PageData<EdgeId> pageData;
|
||||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||||
@ -91,12 +106,14 @@ public class AlarmEdgeProcessor extends BaseAlarmProcessor {
|
|||||||
pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, originatorId, pageLink);
|
pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, originatorId, pageLink);
|
||||||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
||||||
for (EdgeId relatedEdgeId : pageData.getData()) {
|
for (EdgeId relatedEdgeId : pageData.getData()) {
|
||||||
futures.add(saveEdgeEvent(tenantId,
|
if (!relatedEdgeId.equals(sourceEdgeId)) {
|
||||||
relatedEdgeId,
|
futures.add(saveEdgeEvent(tenantId,
|
||||||
EdgeEventType.ALARM,
|
relatedEdgeId,
|
||||||
actionType,
|
EdgeEventType.ALARM,
|
||||||
alarmId,
|
actionType,
|
||||||
body));
|
alarmId,
|
||||||
|
body));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (pageData.hasNext()) {
|
if (pageData.hasNext()) {
|
||||||
pageLink = pageLink.nextPageLink();
|
pageLink = pageLink.nextPageLink();
|
||||||
|
|||||||
@ -45,8 +45,7 @@ import java.util.UUID;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public abstract class BaseAlarmProcessor extends BaseEdgeProcessor {
|
public abstract class BaseAlarmProcessor extends BaseEdgeProcessor {
|
||||||
|
|
||||||
public ListenableFuture<Void> processAlarmMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) {
|
protected ListenableFuture<Void> processAlarmMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) {
|
||||||
log.trace("[{}] processAlarmMsg [{}]", tenantId, alarmUpdateMsg);
|
|
||||||
EntityId originatorId = getAlarmOriginator(tenantId, alarmUpdateMsg.getOriginatorName(),
|
EntityId originatorId = getAlarmOriginator(tenantId, alarmUpdateMsg.getOriginatorName(),
|
||||||
EntityType.valueOf(alarmUpdateMsg.getOriginatorType()));
|
EntityType.valueOf(alarmUpdateMsg.getOriginatorType()));
|
||||||
AlarmId alarmId = new AlarmId(new UUID(alarmUpdateMsg.getIdMSB(), alarmUpdateMsg.getIdLSB()));
|
AlarmId alarmId = new AlarmId(new UUID(alarmUpdateMsg.getIdMSB(), alarmUpdateMsg.getIdLSB()));
|
||||||
@ -55,7 +54,7 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor {
|
|||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
edgeSynchronizationManager.getSync().set(true);
|
|
||||||
switch (alarmUpdateMsg.getMsgType()) {
|
switch (alarmUpdateMsg.getMsgType()) {
|
||||||
case ENTITY_CREATED_RPC_MESSAGE:
|
case ENTITY_CREATED_RPC_MESSAGE:
|
||||||
case ENTITY_UPDATED_RPC_MESSAGE:
|
case ENTITY_UPDATED_RPC_MESSAGE:
|
||||||
@ -73,7 +72,7 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor {
|
|||||||
alarm.setAcknowledged(alarmStatus.isAck());
|
alarm.setAcknowledged(alarmStatus.isAck());
|
||||||
alarm.setAckTs(alarmUpdateMsg.getAckTs());
|
alarm.setAckTs(alarmUpdateMsg.getAckTs());
|
||||||
alarm.setEndTs(alarmUpdateMsg.getEndTs());
|
alarm.setEndTs(alarmUpdateMsg.getEndTs());
|
||||||
alarm.setDetails(JacksonUtil.OBJECT_MAPPER.readTree(alarmUpdateMsg.getDetails()));
|
alarm.setDetails(JacksonUtil.toJsonNode(alarmUpdateMsg.getDetails()));
|
||||||
if (UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE.equals(alarmUpdateMsg.getMsgType())) {
|
if (UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE.equals(alarmUpdateMsg.getMsgType())) {
|
||||||
alarmService.createAlarm(AlarmCreateOrUpdateActiveRequest.fromAlarm(alarm, null, alarmId));
|
alarmService.createAlarm(AlarmCreateOrUpdateActiveRequest.fromAlarm(alarm, null, alarmId));
|
||||||
} else {
|
} else {
|
||||||
@ -90,7 +89,7 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor {
|
|||||||
Alarm alarmToClear = alarmService.findAlarmById(tenantId, alarmId);
|
Alarm alarmToClear = alarmService.findAlarmById(tenantId, alarmId);
|
||||||
if (alarmToClear != null) {
|
if (alarmToClear != null) {
|
||||||
alarmService.clearAlarm(tenantId, alarmId, alarmUpdateMsg.getClearTs(),
|
alarmService.clearAlarm(tenantId, alarmId, alarmUpdateMsg.getClearTs(),
|
||||||
JacksonUtil.OBJECT_MAPPER.readTree(alarmUpdateMsg.getDetails()));
|
JacksonUtil.toJsonNode(alarmUpdateMsg.getDetails()));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case ENTITY_DELETED_RPC_MESSAGE:
|
case ENTITY_DELETED_RPC_MESSAGE:
|
||||||
@ -106,26 +105,11 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[{}] Failed to process alarm update msg [{}]", tenantId, alarmUpdateMsg, e);
|
log.error("[{}] Failed to process alarm update msg [{}]", tenantId, alarmUpdateMsg, e);
|
||||||
return Futures.immediateFailedFuture(e);
|
return Futures.immediateFailedFuture(e);
|
||||||
} finally {
|
|
||||||
edgeSynchronizationManager.getSync().remove();
|
|
||||||
}
|
}
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private EntityId getAlarmOriginator(TenantId tenantId, String entityName, EntityType entityType) {
|
protected AlarmUpdateMsg convertAlarmEventToAlarmMsg(TenantId tenantId, UUID entityId, EdgeEventActionType actionType, JsonNode body) {
|
||||||
switch (entityType) {
|
|
||||||
case DEVICE:
|
|
||||||
return deviceService.findDeviceByTenantIdAndName(tenantId, entityName).getId();
|
|
||||||
case ASSET:
|
|
||||||
return assetService.findAssetByTenantIdAndName(tenantId, entityName).getId();
|
|
||||||
case ENTITY_VIEW:
|
|
||||||
return entityViewService.findEntityViewByTenantIdAndName(tenantId, entityName).getId();
|
|
||||||
default:
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public AlarmUpdateMsg convertAlarmEventToAlarmMsg(TenantId tenantId, UUID entityId, EdgeEventActionType actionType, JsonNode body) {
|
|
||||||
AlarmId alarmId = new AlarmId(entityId);
|
AlarmId alarmId = new AlarmId(entityId);
|
||||||
UpdateMsgType msgType = getUpdateMsgType(actionType);
|
UpdateMsgType msgType = getUpdateMsgType(actionType);
|
||||||
switch (actionType) {
|
switch (actionType) {
|
||||||
@ -139,12 +123,25 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor {
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case DELETED:
|
case DELETED:
|
||||||
Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.convertValue(body, Alarm.class);
|
Alarm deletedAlarm = JacksonUtil.convertValue(body, Alarm.class);
|
||||||
return alarmMsgConstructor.constructAlarmUpdatedMsg(msgType, deletedAlarm, findOriginatorEntityName(tenantId, deletedAlarm));
|
return alarmMsgConstructor.constructAlarmUpdatedMsg(msgType, deletedAlarm, findOriginatorEntityName(tenantId, deletedAlarm));
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private EntityId getAlarmOriginator(TenantId tenantId, String entityName, EntityType entityType) {
|
||||||
|
switch (entityType) {
|
||||||
|
case DEVICE:
|
||||||
|
return deviceService.findDeviceByTenantIdAndName(tenantId, entityName).getId();
|
||||||
|
case ASSET:
|
||||||
|
return assetService.findAssetByTenantIdAndName(tenantId, entityName).getId();
|
||||||
|
case ENTITY_VIEW:
|
||||||
|
return entityViewService.findEntityViewByTenantIdAndName(tenantId, entityName).getId();
|
||||||
|
default:
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private String findOriginatorEntityName(TenantId tenantId, Alarm alarm) {
|
private String findOriginatorEntityName(TenantId tenantId, Alarm alarm) {
|
||||||
String entityName = null;
|
String entityName = null;
|
||||||
switch (alarm.getOriginator().getEntityType()) {
|
switch (alarm.getOriginator().getEntityType()) {
|
||||||
|
|||||||
@ -50,10 +50,10 @@ import java.util.UUID;
|
|||||||
public class AssetEdgeProcessor extends BaseAssetProcessor {
|
public class AssetEdgeProcessor extends BaseAssetProcessor {
|
||||||
|
|
||||||
public ListenableFuture<Void> processAssetMsgFromEdge(TenantId tenantId, Edge edge, AssetUpdateMsg assetUpdateMsg) {
|
public ListenableFuture<Void> processAssetMsgFromEdge(TenantId tenantId, Edge edge, AssetUpdateMsg assetUpdateMsg) {
|
||||||
log.trace("[{}] executing processAssetMsgFromEdge [{}] from edge [{}]", tenantId, assetUpdateMsg, edge.getName());
|
log.trace("[{}] executing processAssetMsgFromEdge [{}] from edge [{}]", tenantId, assetUpdateMsg, edge.getId());
|
||||||
AssetId assetId = new AssetId(new UUID(assetUpdateMsg.getIdMSB(), assetUpdateMsg.getIdLSB()));
|
AssetId assetId = new AssetId(new UUID(assetUpdateMsg.getIdMSB(), assetUpdateMsg.getIdLSB()));
|
||||||
try {
|
try {
|
||||||
edgeSynchronizationManager.getSync().set(true);
|
edgeSynchronizationManager.getEdgeId().set(edge.getId());
|
||||||
|
|
||||||
switch (assetUpdateMsg.getMsgType()) {
|
switch (assetUpdateMsg.getMsgType()) {
|
||||||
case ENTITY_CREATED_RPC_MESSAGE:
|
case ENTITY_CREATED_RPC_MESSAGE:
|
||||||
@ -78,7 +78,7 @@ public class AssetEdgeProcessor extends BaseAssetProcessor {
|
|||||||
return Futures.immediateFailedFuture(e);
|
return Futures.immediateFailedFuture(e);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
edgeSynchronizationManager.getSync().remove();
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -49,10 +49,10 @@ import java.util.UUID;
|
|||||||
public class AssetProfileEdgeProcessor extends BaseAssetProfileProcessor {
|
public class AssetProfileEdgeProcessor extends BaseAssetProfileProcessor {
|
||||||
|
|
||||||
public ListenableFuture<Void> processAssetProfileMsgFromEdge(TenantId tenantId, Edge edge, AssetProfileUpdateMsg assetProfileUpdateMsg) {
|
public ListenableFuture<Void> processAssetProfileMsgFromEdge(TenantId tenantId, Edge edge, AssetProfileUpdateMsg assetProfileUpdateMsg) {
|
||||||
log.trace("[{}] executing processAssetProfileMsgFromEdge [{}] from edge [{}]", tenantId, assetProfileUpdateMsg, edge.getName());
|
log.trace("[{}] executing processAssetProfileMsgFromEdge [{}] from edge [{}]", tenantId, assetProfileUpdateMsg, edge.getId());
|
||||||
AssetProfileId assetProfileId = new AssetProfileId(new UUID(assetProfileUpdateMsg.getIdMSB(), assetProfileUpdateMsg.getIdLSB()));
|
AssetProfileId assetProfileId = new AssetProfileId(new UUID(assetProfileUpdateMsg.getIdMSB(), assetProfileUpdateMsg.getIdLSB()));
|
||||||
try {
|
try {
|
||||||
edgeSynchronizationManager.getSync().set(true);
|
edgeSynchronizationManager.getEdgeId().set(edge.getId());
|
||||||
|
|
||||||
switch (assetProfileUpdateMsg.getMsgType()) {
|
switch (assetProfileUpdateMsg.getMsgType()) {
|
||||||
case ENTITY_CREATED_RPC_MESSAGE:
|
case ENTITY_CREATED_RPC_MESSAGE:
|
||||||
@ -68,7 +68,7 @@ public class AssetProfileEdgeProcessor extends BaseAssetProfileProcessor {
|
|||||||
log.warn("[{}] Failed to process AssetProfileUpdateMsg from Edge [{}]", tenantId, assetProfileUpdateMsg, e);
|
log.warn("[{}] Failed to process AssetProfileUpdateMsg from Edge [{}]", tenantId, assetProfileUpdateMsg, e);
|
||||||
return Futures.immediateFailedFuture(e);
|
return Futures.immediateFailedFuture(e);
|
||||||
} finally {
|
} finally {
|
||||||
edgeSynchronizationManager.getSync().remove();
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -45,10 +45,10 @@ import java.util.UUID;
|
|||||||
public class DashboardEdgeProcessor extends BaseDashboardProcessor {
|
public class DashboardEdgeProcessor extends BaseDashboardProcessor {
|
||||||
|
|
||||||
public ListenableFuture<Void> processDashboardMsgFromEdge(TenantId tenantId, Edge edge, DashboardUpdateMsg dashboardUpdateMsg) {
|
public ListenableFuture<Void> processDashboardMsgFromEdge(TenantId tenantId, Edge edge, DashboardUpdateMsg dashboardUpdateMsg) {
|
||||||
log.trace("[{}] executing processDashboardMsgFromEdge [{}] from edge [{}]", tenantId, dashboardUpdateMsg, edge.getName());
|
log.trace("[{}] executing processDashboardMsgFromEdge [{}] from edge [{}]", tenantId, dashboardUpdateMsg, edge.getId());
|
||||||
DashboardId dashboardId = new DashboardId(new UUID(dashboardUpdateMsg.getIdMSB(), dashboardUpdateMsg.getIdLSB()));
|
DashboardId dashboardId = new DashboardId(new UUID(dashboardUpdateMsg.getIdMSB(), dashboardUpdateMsg.getIdLSB()));
|
||||||
try {
|
try {
|
||||||
edgeSynchronizationManager.getSync().set(true);
|
edgeSynchronizationManager.getEdgeId().set(edge.getId());
|
||||||
|
|
||||||
switch (dashboardUpdateMsg.getMsgType()) {
|
switch (dashboardUpdateMsg.getMsgType()) {
|
||||||
case ENTITY_CREATED_RPC_MESSAGE:
|
case ENTITY_CREATED_RPC_MESSAGE:
|
||||||
@ -73,7 +73,7 @@ public class DashboardEdgeProcessor extends BaseDashboardProcessor {
|
|||||||
return Futures.immediateFailedFuture(e);
|
return Futures.immediateFailedFuture(e);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
edgeSynchronizationManager.getSync().remove();
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -16,7 +16,6 @@
|
|||||||
package org.thingsboard.server.service.edge.rpc.processor.device;
|
package org.thingsboard.server.service.edge.rpc.processor.device;
|
||||||
|
|
||||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
|
import com.datastax.oss.driver.api.core.uuid.Uuids;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.data.util.Pair;
|
import org.springframework.data.util.Pair;
|
||||||
@ -107,34 +106,27 @@ public abstract class BaseDeviceProcessor extends BaseEdgeProcessor {
|
|||||||
return Pair.of(created, deviceNameUpdated);
|
return Pair.of(created, deviceNameUpdated);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ListenableFuture<Void> processDeviceCredentialsMsg(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) {
|
protected void updateDeviceCredentials(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) {
|
||||||
log.debug("[{}] Executing processDeviceCredentialsMsg, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg);
|
|
||||||
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB()));
|
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB()));
|
||||||
return dbCallbackExecutorService.submit(() -> {
|
Device device = deviceService.findDeviceById(tenantId, deviceId);
|
||||||
Device device = deviceService.findDeviceById(tenantId, deviceId);
|
if (device != null) {
|
||||||
if (device != null) {
|
log.debug("[{}] Updating device credentials for device [{}]. New device credentials Id [{}], value [{}]",
|
||||||
log.debug("[{}] Updating device credentials for device [{}]. New device credentials Id [{}], value [{}]",
|
tenantId, device.getName(), deviceCredentialsUpdateMsg.getCredentialsId(), deviceCredentialsUpdateMsg.getCredentialsValue());
|
||||||
tenantId, device.getName(), deviceCredentialsUpdateMsg.getCredentialsId(), deviceCredentialsUpdateMsg.getCredentialsValue());
|
try {
|
||||||
try {
|
DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(tenantId, device.getId());
|
||||||
edgeSynchronizationManager.getSync().set(true);
|
deviceCredentials.setCredentialsType(DeviceCredentialsType.valueOf(deviceCredentialsUpdateMsg.getCredentialsType()));
|
||||||
|
deviceCredentials.setCredentialsId(deviceCredentialsUpdateMsg.getCredentialsId());
|
||||||
|
deviceCredentials.setCredentialsValue(deviceCredentialsUpdateMsg.hasCredentialsValue()
|
||||||
|
? deviceCredentialsUpdateMsg.getCredentialsValue() : null);
|
||||||
|
deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials);
|
||||||
|
|
||||||
DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(tenantId, device.getId());
|
} catch (Exception e) {
|
||||||
deviceCredentials.setCredentialsType(DeviceCredentialsType.valueOf(deviceCredentialsUpdateMsg.getCredentialsType()));
|
log.error("[{}] Can't update device credentials for device [{}], deviceCredentialsUpdateMsg [{}]",
|
||||||
deviceCredentials.setCredentialsId(deviceCredentialsUpdateMsg.getCredentialsId());
|
tenantId, device.getName(), deviceCredentialsUpdateMsg, e);
|
||||||
deviceCredentials.setCredentialsValue(deviceCredentialsUpdateMsg.hasCredentialsValue()
|
throw new RuntimeException(e);
|
||||||
? deviceCredentialsUpdateMsg.getCredentialsValue() : null);
|
|
||||||
deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("[{}] Can't update device credentials for device [{}], deviceCredentialsUpdateMsg [{}]",
|
|
||||||
tenantId, device.getName(), deviceCredentialsUpdateMsg, e);
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
} finally {
|
|
||||||
edgeSynchronizationManager.getSync().remove();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.warn("[{}] Can't find device by id [{}], deviceCredentialsUpdateMsg [{}]", tenantId, deviceId, deviceCredentialsUpdateMsg);
|
|
||||||
}
|
}
|
||||||
return null;
|
} else {
|
||||||
});
|
log.warn("[{}] Can't find device by id [{}], deviceCredentialsUpdateMsg [{}]", tenantId, deviceId, deviceCredentialsUpdateMsg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,7 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.edge.rpc.processor.device;
|
package org.thingsboard.server.service.edge.rpc.processor.device;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
||||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
@ -65,10 +64,10 @@ import java.util.UUID;
|
|||||||
public class DeviceEdgeProcessor extends BaseDeviceProcessor {
|
public class DeviceEdgeProcessor extends BaseDeviceProcessor {
|
||||||
|
|
||||||
public ListenableFuture<Void> processDeviceMsgFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
|
public ListenableFuture<Void> processDeviceMsgFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
|
||||||
log.trace("[{}] executing processDeviceMsgFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName());
|
log.trace("[{}] executing processDeviceMsgFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getId());
|
||||||
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
|
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
|
||||||
try {
|
try {
|
||||||
edgeSynchronizationManager.getSync().set(true);
|
edgeSynchronizationManager.getEdgeId().set(edge.getId());
|
||||||
|
|
||||||
switch (deviceUpdateMsg.getMsgType()) {
|
switch (deviceUpdateMsg.getMsgType()) {
|
||||||
case ENTITY_CREATED_RPC_MESSAGE:
|
case ENTITY_CREATED_RPC_MESSAGE:
|
||||||
@ -93,10 +92,22 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor {
|
|||||||
return Futures.immediateFailedFuture(e);
|
return Futures.immediateFailedFuture(e);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
edgeSynchronizationManager.getSync().remove();
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ListenableFuture<Void> processDeviceCredentialsMsgFromEdge(TenantId tenantId, EdgeId edgeId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) {
|
||||||
|
log.debug("[{}] Executing processDeviceCredentialsMsgFromEdge, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg);
|
||||||
|
try {
|
||||||
|
edgeSynchronizationManager.getEdgeId().set(edgeId);
|
||||||
|
|
||||||
|
updateDeviceCredentials(tenantId, deviceCredentialsUpdateMsg);
|
||||||
|
} finally {
|
||||||
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
|
}
|
||||||
|
return Futures.immediateFuture(null);
|
||||||
|
}
|
||||||
|
|
||||||
private void saveOrUpdateDevice(TenantId tenantId, DeviceId deviceId, DeviceUpdateMsg deviceUpdateMsg, Edge edge) {
|
private void saveOrUpdateDevice(TenantId tenantId, DeviceId deviceId, DeviceUpdateMsg deviceUpdateMsg, Edge edge) {
|
||||||
CustomerId customerId = safeGetCustomerId(deviceUpdateMsg.getCustomerIdMSB(), deviceUpdateMsg.getCustomerIdLSB());
|
CustomerId customerId = safeGetCustomerId(deviceUpdateMsg.getCustomerIdMSB(), deviceUpdateMsg.getCustomerIdLSB());
|
||||||
Pair<Boolean, Boolean> resultPair = super.saveOrUpdateDevice(tenantId, deviceId, deviceUpdateMsg, customerId);
|
Pair<Boolean, Boolean> resultPair = super.saveOrUpdateDevice(tenantId, deviceId, deviceUpdateMsg, customerId);
|
||||||
@ -183,7 +194,7 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor {
|
|||||||
data.put("method", deviceRpcCallMsg.getRequestMsg().getMethod());
|
data.put("method", deviceRpcCallMsg.getRequestMsg().getMethod());
|
||||||
data.put("params", deviceRpcCallMsg.getRequestMsg().getParams());
|
data.put("params", deviceRpcCallMsg.getRequestMsg().getParams());
|
||||||
TbMsg tbMsg = TbMsg.newMsg(TbMsgType.TO_SERVER_RPC_REQUEST, deviceId, null, metaData,
|
TbMsg tbMsg = TbMsg.newMsg(TbMsgType.TO_SERVER_RPC_REQUEST, deviceId, null, metaData,
|
||||||
TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(data));
|
TbMsgDataType.JSON, JacksonUtil.toString(data));
|
||||||
tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, new TbQueueCallback() {
|
tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, new TbQueueCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(TbQueueMsgMetadata metadata) {
|
public void onSuccess(TbQueueMsgMetadata metadata) {
|
||||||
@ -197,7 +208,7 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor {
|
|||||||
tenantId, device, deviceRpcCallMsg, t);
|
tenantId, device, deviceRpcCallMsg, t);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (JsonProcessingException | IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
log.warn("[{}][{}] Failed to push TO_SERVER_RPC_REQUEST to rule engine. deviceRpcCallMsg {}", tenantId, deviceId, deviceRpcCallMsg, e);
|
log.warn("[{}][{}] Failed to push TO_SERVER_RPC_REQUEST to rule engine. deviceRpcCallMsg {}", tenantId, deviceId, deviceRpcCallMsg, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -49,10 +49,10 @@ import java.util.UUID;
|
|||||||
public class DeviceProfileEdgeProcessor extends BaseDeviceProfileProcessor {
|
public class DeviceProfileEdgeProcessor extends BaseDeviceProfileProcessor {
|
||||||
|
|
||||||
public ListenableFuture<Void> processDeviceProfileMsgFromEdge(TenantId tenantId, Edge edge, DeviceProfileUpdateMsg deviceProfileUpdateMsg) {
|
public ListenableFuture<Void> processDeviceProfileMsgFromEdge(TenantId tenantId, Edge edge, DeviceProfileUpdateMsg deviceProfileUpdateMsg) {
|
||||||
log.trace("[{}] executing processDeviceProfileMsgFromEdge [{}] from edge [{}]", tenantId, deviceProfileUpdateMsg, edge.getName());
|
log.trace("[{}] executing processDeviceProfileMsgFromEdge [{}] from edge [{}]", tenantId, deviceProfileUpdateMsg, edge.getId());
|
||||||
DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileUpdateMsg.getIdMSB(), deviceProfileUpdateMsg.getIdLSB()));
|
DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileUpdateMsg.getIdMSB(), deviceProfileUpdateMsg.getIdLSB()));
|
||||||
try {
|
try {
|
||||||
edgeSynchronizationManager.getSync().set(true);
|
edgeSynchronizationManager.getEdgeId().set(edge.getId());
|
||||||
|
|
||||||
switch (deviceProfileUpdateMsg.getMsgType()) {
|
switch (deviceProfileUpdateMsg.getMsgType()) {
|
||||||
case ENTITY_CREATED_RPC_MESSAGE:
|
case ENTITY_CREATED_RPC_MESSAGE:
|
||||||
@ -68,7 +68,7 @@ public class DeviceProfileEdgeProcessor extends BaseDeviceProfileProcessor {
|
|||||||
log.warn("[{}] Failed to process DeviceProfileUpdateMsg from Edge [{}]", tenantId, deviceProfileUpdateMsg, e);
|
log.warn("[{}] Failed to process DeviceProfileUpdateMsg from Edge [{}]", tenantId, deviceProfileUpdateMsg, e);
|
||||||
return Futures.immediateFailedFuture(e);
|
return Futures.immediateFailedFuture(e);
|
||||||
} finally {
|
} finally {
|
||||||
edgeSynchronizationManager.getSync().remove();
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -72,9 +72,9 @@ public class EdgeProcessor extends BaseEdgeProcessor {
|
|||||||
EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
|
EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
|
||||||
switch (actionType) {
|
switch (actionType) {
|
||||||
case ASSIGNED_TO_CUSTOMER:
|
case ASSIGNED_TO_CUSTOMER:
|
||||||
CustomerId customerId = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), CustomerId.class);
|
CustomerId customerId = JacksonUtil.fromString(edgeNotificationMsg.getBody(), CustomerId.class);
|
||||||
Edge edge = edgeService.findEdgeById(tenantId, edgeId);
|
Edge edge = edgeService.findEdgeById(tenantId, edgeId);
|
||||||
if (edge == null || customerId.isNullUid()) {
|
if (customerId != null && (edge == null || customerId.isNullUid())) {
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||||
@ -96,9 +96,9 @@ public class EdgeProcessor extends BaseEdgeProcessor {
|
|||||||
} while (pageData != null && pageData.hasNext());
|
} while (pageData != null && pageData.hasNext());
|
||||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||||
case UNASSIGNED_FROM_CUSTOMER:
|
case UNASSIGNED_FROM_CUSTOMER:
|
||||||
CustomerId customerIdToDelete = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), CustomerId.class);
|
CustomerId customerIdToDelete = JacksonUtil.fromString(edgeNotificationMsg.getBody(), CustomerId.class);
|
||||||
edge = edgeService.findEdgeById(tenantId, edgeId);
|
edge = edgeService.findEdgeById(tenantId, edgeId);
|
||||||
if (edge == null || customerIdToDelete.isNullUid()) {
|
if (customerIdToDelete != null && (edge == null || customerIdToDelete.isNullUid())) {
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
return Futures.transformAsync(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.EDGE, EdgeEventActionType.UNASSIGNED_FROM_CUSTOMER, edgeId, null),
|
return Futures.transformAsync(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.EDGE, EdgeEventActionType.UNASSIGNED_FROM_CUSTOMER, edgeId, null),
|
||||||
|
|||||||
@ -46,10 +46,10 @@ import java.util.UUID;
|
|||||||
public class EntityViewEdgeProcessor extends BaseEntityViewProcessor {
|
public class EntityViewEdgeProcessor extends BaseEntityViewProcessor {
|
||||||
|
|
||||||
public ListenableFuture<Void> processEntityViewMsgFromEdge(TenantId tenantId, Edge edge, EntityViewUpdateMsg entityViewUpdateMsg) {
|
public ListenableFuture<Void> processEntityViewMsgFromEdge(TenantId tenantId, Edge edge, EntityViewUpdateMsg entityViewUpdateMsg) {
|
||||||
log.trace("[{}] executing processEntityViewMsgFromEdge [{}] from edge [{}]", tenantId, entityViewUpdateMsg, edge.getName());
|
log.trace("[{}] executing processEntityViewMsgFromEdge [{}] from edge [{}]", tenantId, entityViewUpdateMsg, edge.getId());
|
||||||
EntityViewId entityViewId = new EntityViewId(new UUID(entityViewUpdateMsg.getIdMSB(), entityViewUpdateMsg.getIdLSB()));
|
EntityViewId entityViewId = new EntityViewId(new UUID(entityViewUpdateMsg.getIdMSB(), entityViewUpdateMsg.getIdLSB()));
|
||||||
try {
|
try {
|
||||||
edgeSynchronizationManager.getSync().set(true);
|
edgeSynchronizationManager.getEdgeId().set(edge.getId());
|
||||||
|
|
||||||
switch (entityViewUpdateMsg.getMsgType()) {
|
switch (entityViewUpdateMsg.getMsgType()) {
|
||||||
case ENTITY_CREATED_RPC_MESSAGE:
|
case ENTITY_CREATED_RPC_MESSAGE:
|
||||||
@ -74,7 +74,7 @@ public class EntityViewEdgeProcessor extends BaseEntityViewProcessor {
|
|||||||
return Futures.immediateFailedFuture(e);
|
return Futures.immediateFailedFuture(e);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
edgeSynchronizationManager.getSync().remove();
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -33,10 +33,8 @@ import java.util.UUID;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public abstract class BaseRelationProcessor extends BaseEdgeProcessor {
|
public abstract class BaseRelationProcessor extends BaseEdgeProcessor {
|
||||||
|
|
||||||
public ListenableFuture<Void> processRelationMsg(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) {
|
protected ListenableFuture<Void> processRelationMsg(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) {
|
||||||
log.trace("[{}] processRelationMsg [{}]", tenantId, relationUpdateMsg);
|
|
||||||
try {
|
try {
|
||||||
edgeSynchronizationManager.getSync().set(true);
|
|
||||||
EntityRelation entityRelation = new EntityRelation();
|
EntityRelation entityRelation = new EntityRelation();
|
||||||
|
|
||||||
UUID fromUUID = new UUID(relationUpdateMsg.getFromIdMSB(), relationUpdateMsg.getFromIdLSB());
|
UUID fromUUID = new UUID(relationUpdateMsg.getFromIdMSB(), relationUpdateMsg.getFromIdLSB());
|
||||||
@ -72,8 +70,6 @@ public abstract class BaseRelationProcessor extends BaseEdgeProcessor {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[{}] Failed to process relation update msg [{}]", tenantId, relationUpdateMsg, e);
|
log.error("[{}] Failed to process relation update msg [{}]", tenantId, relationUpdateMsg, e);
|
||||||
return Futures.immediateFailedFuture(e);
|
return Futures.immediateFailedFuture(e);
|
||||||
} finally {
|
|
||||||
edgeSynchronizationManager.getSync().remove();
|
|
||||||
}
|
}
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,7 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.edge.rpc.processor.relation;
|
package org.thingsboard.server.service.edge.rpc.processor.relation;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -23,6 +22,7 @@ import org.springframework.stereotype.Component;
|
|||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.server.common.data.EdgeUtils;
|
import org.thingsboard.server.common.data.EdgeUtils;
|
||||||
import org.thingsboard.server.common.data.EntityType;
|
import org.thingsboard.server.common.data.EntityType;
|
||||||
|
import org.thingsboard.server.common.data.edge.Edge;
|
||||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||||
@ -45,8 +45,19 @@ import java.util.Set;
|
|||||||
@TbCoreComponent
|
@TbCoreComponent
|
||||||
public class RelationEdgeProcessor extends BaseRelationProcessor {
|
public class RelationEdgeProcessor extends BaseRelationProcessor {
|
||||||
|
|
||||||
|
public ListenableFuture<Void> processRelationMsgFromEdge(TenantId tenantId, Edge edge, RelationUpdateMsg relationUpdateMsg) {
|
||||||
|
log.trace("[{}] executing processRelationMsgFromEdge [{}] from edge [{}]", tenantId, relationUpdateMsg, edge.getId());
|
||||||
|
try {
|
||||||
|
edgeSynchronizationManager.getEdgeId().set(edge.getId());
|
||||||
|
|
||||||
|
return processRelationMsg(tenantId, relationUpdateMsg);
|
||||||
|
} finally {
|
||||||
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public DownlinkMsg convertRelationEventToDownlink(EdgeEvent edgeEvent) {
|
public DownlinkMsg convertRelationEventToDownlink(EdgeEvent edgeEvent) {
|
||||||
EntityRelation entityRelation = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), EntityRelation.class);
|
EntityRelation entityRelation = JacksonUtil.convertValue(edgeEvent.getBody(), EntityRelation.class);
|
||||||
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
|
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
|
||||||
RelationUpdateMsg relationUpdateMsg = relationMsgConstructor.constructRelationUpdatedMsg(msgType, entityRelation);
|
RelationUpdateMsg relationUpdateMsg = relationMsgConstructor.constructRelationUpdatedMsg(msgType, entityRelation);
|
||||||
return DownlinkMsg.newBuilder()
|
return DownlinkMsg.newBuilder()
|
||||||
@ -55,10 +66,9 @@ public class RelationEdgeProcessor extends BaseRelationProcessor {
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ListenableFuture<Void> processRelationNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException {
|
public ListenableFuture<Void> processRelationNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
||||||
EntityRelation relation = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), EntityRelation.class);
|
EntityRelation relation = JacksonUtil.fromString(edgeNotificationMsg.getBody(), EntityRelation.class);
|
||||||
if (relation.getFrom().getEntityType().equals(EntityType.EDGE) ||
|
if (relation == null || (relation.getFrom().getEntityType().equals(EntityType.EDGE) || relation.getTo().getEntityType().equals(EntityType.EDGE))) {
|
||||||
relation.getTo().getEntityType().equals(EntityType.EDGE)) {
|
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,7 +85,7 @@ public class RelationEdgeProcessor extends BaseRelationProcessor {
|
|||||||
EdgeEventType.RELATION,
|
EdgeEventType.RELATION,
|
||||||
EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()),
|
EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()),
|
||||||
null,
|
null,
|
||||||
JacksonUtil.OBJECT_MAPPER.valueToTree(relation)));
|
JacksonUtil.valueToTree(relation)));
|
||||||
}
|
}
|
||||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -32,7 +32,10 @@ import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
|||||||
public class AdminSettingsEdgeProcessor extends BaseEdgeProcessor {
|
public class AdminSettingsEdgeProcessor extends BaseEdgeProcessor {
|
||||||
|
|
||||||
public DownlinkMsg convertAdminSettingsEventToDownlink(EdgeEvent edgeEvent) {
|
public DownlinkMsg convertAdminSettingsEventToDownlink(EdgeEvent edgeEvent) {
|
||||||
AdminSettings adminSettings = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), AdminSettings.class);
|
AdminSettings adminSettings = JacksonUtil.convertValue(edgeEvent.getBody(), AdminSettings.class);
|
||||||
|
if (adminSettings == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
AdminSettingsUpdateMsg adminSettingsUpdateMsg = adminSettingsMsgConstructor.constructAdminSettingsUpdateMsg(adminSettings);
|
AdminSettingsUpdateMsg adminSettingsUpdateMsg = adminSettingsMsgConstructor.constructAdminSettingsUpdateMsg(adminSettings);
|
||||||
return DownlinkMsg.newBuilder()
|
return DownlinkMsg.newBuilder()
|
||||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
||||||
|
|||||||
@ -22,7 +22,6 @@ import com.google.common.util.concurrent.Futures;
|
|||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
import com.google.gson.Gson;
|
import com.google.gson.Gson;
|
||||||
import com.google.gson.JsonElement;
|
|
||||||
import com.google.gson.JsonObject;
|
import com.google.gson.JsonObject;
|
||||||
import com.google.gson.JsonParser;
|
import com.google.gson.JsonParser;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -346,8 +345,9 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
|
|||||||
log.warn("[{}] Unsupported edge event type [{}]", tenantId, entityType);
|
log.warn("[{}] Unsupported edge event type [{}]", tenantId, entityType);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
JsonElement entityData = JsonParser.parseString(JacksonUtil.OBJECT_MAPPER.writeValueAsString(body));
|
String bodyJackson = JacksonUtil.toString(body);
|
||||||
return entityDataMsgConstructor.constructEntityDataMsg(tenantId, entityId, actionType, entityData);
|
return bodyJackson == null ? null :
|
||||||
|
entityDataMsgConstructor.constructEntityDataMsg(tenantId, entityId, actionType, JsonParser.parseString(bodyJackson));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -176,7 +176,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
|
|||||||
if (attributes.size() > 0) {
|
if (attributes.size() > 0) {
|
||||||
entityData.put("kv", attributes);
|
entityData.put("kv", attributes);
|
||||||
entityData.put("scope", scope);
|
entityData.put("scope", scope);
|
||||||
JsonNode body = JacksonUtil.OBJECT_MAPPER.valueToTree(entityData);
|
JsonNode body = JacksonUtil.valueToTree(entityData);
|
||||||
log.debug("[{}] Sending attributes data msg, entityId [{}], attributes [{}]", tenantId, entityId, body);
|
log.debug("[{}] Sending attributes data msg, entityId [{}], attributes [{}]", tenantId, entityId, body);
|
||||||
future = saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body);
|
future = saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body);
|
||||||
} else {
|
} else {
|
||||||
@ -249,7 +249,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
|
|||||||
EdgeEventType.RELATION,
|
EdgeEventType.RELATION,
|
||||||
EdgeEventActionType.ADDED,
|
EdgeEventActionType.ADDED,
|
||||||
null,
|
null,
|
||||||
JacksonUtil.OBJECT_MAPPER.valueToTree(relation)));
|
JacksonUtil.valueToTree(relation)));
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String errMsg = String.format("[%s][%s] Exception during loading relation [%s] to edge on sync!", tenantId, edge.getId(), relation);
|
String errMsg = String.format("[%s][%s] Exception during loading relation [%s] to edge on sync!", tenantId, edge.getId(), relation);
|
||||||
|
|||||||
@ -475,7 +475,7 @@ public class DefaultTbClusterService implements TbClusterService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action) {
|
public void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action, EdgeId sourceEdgeId) {
|
||||||
if (!edgesEnabled) {
|
if (!edgesEnabled) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -508,6 +508,10 @@ public class DefaultTbClusterService implements TbClusterService {
|
|||||||
if (body != null) {
|
if (body != null) {
|
||||||
builder.setBody(body);
|
builder.setBody(body);
|
||||||
}
|
}
|
||||||
|
if (sourceEdgeId != null) {
|
||||||
|
builder.setSourceEdgeIdMSB(sourceEdgeId.getId().getMostSignificantBits());
|
||||||
|
builder.setSourceEdgeIdLSB(sourceEdgeId.getId().getLeastSignificantBits());
|
||||||
|
}
|
||||||
TransportProtos.EdgeNotificationMsgProto msg = builder.build();
|
TransportProtos.EdgeNotificationMsgProto msg = builder.build();
|
||||||
log.trace("[{}] sending notification to edge service {}", tenantId.getId(), msg);
|
log.trace("[{}] sending notification to edge service {}", tenantId.getId(), msg);
|
||||||
pushMsgToCore(tenantId, entityId != null ? entityId : tenantId, TransportProtos.ToCoreMsg.newBuilder().setEdgeNotificationMsg(msg).build(), null);
|
pushMsgToCore(tenantId, entityId != null ? entityId : tenantId, TransportProtos.ToCoreMsg.newBuilder().setEdgeNotificationMsg(msg).build(), null);
|
||||||
|
|||||||
@ -91,7 +91,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest {
|
|||||||
int cntTime = 1;
|
int cntTime = 1;
|
||||||
Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId),
|
Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId),
|
||||||
Mockito.isNull(), Mockito.isNull(), Mockito.any(), Mockito.eq(EdgeEventType.RELATION),
|
Mockito.isNull(), Mockito.isNull(), Mockito.any(), Mockito.eq(EdgeEventType.RELATION),
|
||||||
Mockito.eq(edgeTypeByActionType(actionType)));
|
Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any());
|
||||||
ArgumentMatcher<EntityId> matcherOriginatorId = argument -> argument.equals(relation.getTo());
|
ArgumentMatcher<EntityId> matcherOriginatorId = argument -> argument.equals(relation.getTo());
|
||||||
ArgumentMatcher<HasName> matcherEntityClassEquals = Objects::isNull;
|
ArgumentMatcher<HasName> matcherEntityClassEquals = Objects::isNull;
|
||||||
ArgumentMatcher<CustomerId> matcherCustomerId = customerId == null ?
|
ArgumentMatcher<CustomerId> matcherCustomerId = customerId == null ?
|
||||||
@ -111,7 +111,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest {
|
|||||||
ActionType actionType, int cntTime) {
|
ActionType actionType, int cntTime) {
|
||||||
Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId),
|
Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId),
|
||||||
Mockito.isNull(), Mockito.isNull(), Mockito.any(), Mockito.eq(EdgeEventType.RELATION),
|
Mockito.isNull(), Mockito.isNull(), Mockito.any(), Mockito.eq(EdgeEventType.RELATION),
|
||||||
Mockito.eq(edgeTypeByActionType(actionType)));
|
Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any());
|
||||||
ArgumentMatcher<EntityId> matcherOriginatorId = argument -> argument.getClass().equals(relation.getFrom().getClass());
|
ArgumentMatcher<EntityId> matcherOriginatorId = argument -> argument.getClass().equals(relation.getFrom().getClass());
|
||||||
ArgumentMatcher<HasName> matcherEntityClassEquals = Objects::isNull;
|
ArgumentMatcher<HasName> matcherEntityClassEquals = Objects::isNull;
|
||||||
ArgumentMatcher<CustomerId> matcherCustomerId = customerId == null ?
|
ArgumentMatcher<CustomerId> matcherCustomerId = customerId == null ?
|
||||||
@ -318,13 +318,13 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest {
|
|||||||
private void testNotificationMsgToEdgeServiceNeverWithActionType(EntityId entityId, ActionType actionType) {
|
private void testNotificationMsgToEdgeServiceNeverWithActionType(EntityId entityId, ActionType actionType) {
|
||||||
EdgeEventActionType edgeEventActionType = ActionType.CREDENTIALS_UPDATED.equals(actionType) ?
|
EdgeEventActionType edgeEventActionType = ActionType.CREDENTIALS_UPDATED.equals(actionType) ?
|
||||||
EdgeEventActionType.CREDENTIALS_UPDATED : edgeTypeByActionType(actionType);
|
EdgeEventActionType.CREDENTIALS_UPDATED : edgeTypeByActionType(actionType);
|
||||||
Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(),
|
Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(), Mockito.any(),
|
||||||
Mockito.any(), Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.eq(edgeEventActionType));
|
Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.eq(edgeEventActionType), Mockito.any());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testNotificationMsgToEdgeServiceNever(EntityId entityId) {
|
private void testNotificationMsgToEdgeServiceNever(EntityId entityId) {
|
||||||
Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(),
|
Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(), Mockito.any(),
|
||||||
Mockito.any(), Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.any());
|
Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testLogEntityActionNever(EntityId entityId, HasName entity) {
|
private void testLogEntityActionNever(EntityId entityId, HasName entity) {
|
||||||
@ -358,13 +358,13 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest {
|
|||||||
argument -> argument.getClass().equals(entityId.getClass());
|
argument -> argument.getClass().equals(entityId.getClass());
|
||||||
Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId),
|
Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId),
|
||||||
Mockito.any(), Mockito.argThat(matcherEntityId), Mockito.any(), Mockito.isNull(),
|
Mockito.any(), Mockito.argThat(matcherEntityId), Mockito.any(), Mockito.isNull(),
|
||||||
Mockito.eq(edgeEventActionType));
|
Mockito.eq(edgeEventActionType), Mockito.any());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testSendNotificationMsgToEdgeServiceTimeEntityEqAny(TenantId tenantId, ActionType actionType, int cntTime) {
|
private void testSendNotificationMsgToEdgeServiceTimeEntityEqAny(TenantId tenantId, ActionType actionType, int cntTime) {
|
||||||
Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId),
|
Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId),
|
||||||
Mockito.any(), Mockito.any(EntityId.class), Mockito.any(), Mockito.isNull(),
|
Mockito.any(), Mockito.any(EntityId.class), Mockito.any(), Mockito.isNull(),
|
||||||
Mockito.eq(edgeTypeByActionType(actionType)));
|
Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void testBroadcastEntityStateChangeEventTime(EntityId entityId, TenantId tenantId, int cntTime) {
|
protected void testBroadcastEntityStateChangeEventTime(EntityId entityId, TenantId tenantId, int cntTime) {
|
||||||
|
|||||||
@ -553,9 +553,9 @@ public class DeviceEdgeTest extends AbstractEdgeTest {
|
|||||||
Assert.assertTrue(edgeImitator.waitForResponses());
|
Assert.assertTrue(edgeImitator.waitForResponses());
|
||||||
Assert.assertTrue(edgeImitator.waitForMessages());
|
Assert.assertTrue(edgeImitator.waitForMessages());
|
||||||
|
|
||||||
AbstractMessage latestMessage = edgeImitator.getLatestMessage();
|
Optional<DeviceCredentialsRequestMsg> deviceCredentialsRequestMsgOpt = edgeImitator.findMessageByType(DeviceCredentialsRequestMsg.class);
|
||||||
Assert.assertTrue(latestMessage instanceof DeviceCredentialsRequestMsg);
|
Assert.assertTrue(deviceCredentialsRequestMsgOpt.isPresent());
|
||||||
DeviceCredentialsRequestMsg latestDeviceCredentialsRequestMsg = (DeviceCredentialsRequestMsg) latestMessage;
|
DeviceCredentialsRequestMsg latestDeviceCredentialsRequestMsg = deviceCredentialsRequestMsgOpt.get();
|
||||||
Assert.assertEquals(uuid.getMostSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdMSB());
|
Assert.assertEquals(uuid.getMostSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdMSB());
|
||||||
Assert.assertEquals(uuid.getLeastSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB());
|
Assert.assertEquals(uuid.getLeastSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB());
|
||||||
|
|
||||||
|
|||||||
@ -592,7 +592,7 @@ public class ExportImportServiceSqlTest extends BaseExportImportServiceTest {
|
|||||||
Customer updatedCustomer = importEntity(tenantAdmin2, updatedCustomerEntity).getSavedEntity();
|
Customer updatedCustomer = importEntity(tenantAdmin2, updatedCustomerEntity).getSavedEntity();
|
||||||
verify(entityActionService).logEntityAction(any(), eq(importedCustomer.getId()), eq(updatedCustomer),
|
verify(entityActionService).logEntityAction(any(), eq(importedCustomer.getId()), eq(updatedCustomer),
|
||||||
any(), eq(ActionType.UPDATED), isNull());
|
any(), eq(ActionType.UPDATED), isNull());
|
||||||
verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedCustomer.getId()), any(), any(), eq(EdgeEventActionType.UPDATED));
|
verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedCustomer.getId()), any(), any(), eq(EdgeEventActionType.UPDATED), any());
|
||||||
|
|
||||||
Mockito.reset(entityActionService);
|
Mockito.reset(entityActionService);
|
||||||
|
|
||||||
@ -609,7 +609,7 @@ public class ExportImportServiceSqlTest extends BaseExportImportServiceTest {
|
|||||||
verify(entityActionService).logEntityAction(any(), eq(importedAssetProfile.getId()), eq(importedAssetProfile),
|
verify(entityActionService).logEntityAction(any(), eq(importedAssetProfile.getId()), eq(importedAssetProfile),
|
||||||
any(), eq(ActionType.ADDED), isNull());
|
any(), eq(ActionType.ADDED), isNull());
|
||||||
verify(tbClusterService).broadcastEntityStateChangeEvent(any(), eq(importedAssetProfile.getId()), eq(ComponentLifecycleEvent.CREATED));
|
verify(tbClusterService).broadcastEntityStateChangeEvent(any(), eq(importedAssetProfile.getId()), eq(ComponentLifecycleEvent.CREATED));
|
||||||
verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedAssetProfile.getId()), any(), any(), eq(EdgeEventActionType.ADDED));
|
verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedAssetProfile.getId()), any(), any(), eq(EdgeEventActionType.ADDED), any());
|
||||||
|
|
||||||
Asset importedAsset = (Asset) importEntity(tenantAdmin2, getAndClone(entitiesExportData, EntityType.ASSET)).getSavedEntity();
|
Asset importedAsset = (Asset) importEntity(tenantAdmin2, getAndClone(entitiesExportData, EntityType.ASSET)).getSavedEntity();
|
||||||
verify(entityActionService).logEntityAction(any(), eq(importedAsset.getId()), eq(importedAsset),
|
verify(entityActionService).logEntityAction(any(), eq(importedAsset.getId()), eq(importedAsset),
|
||||||
@ -625,14 +625,14 @@ public class ExportImportServiceSqlTest extends BaseExportImportServiceTest {
|
|||||||
|
|
||||||
verify(entityActionService).logEntityAction(any(), eq(importedAsset.getId()), eq(updatedAsset),
|
verify(entityActionService).logEntityAction(any(), eq(importedAsset.getId()), eq(updatedAsset),
|
||||||
any(), eq(ActionType.UPDATED), isNull());
|
any(), eq(ActionType.UPDATED), isNull());
|
||||||
verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedAsset.getId()), any(), any(), eq(EdgeEventActionType.UPDATED));
|
verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedAsset.getId()), any(), any(), eq(EdgeEventActionType.UPDATED), any());
|
||||||
|
|
||||||
DeviceProfile importedDeviceProfile = (DeviceProfile) importEntity(tenantAdmin2, getAndClone(entitiesExportData, EntityType.DEVICE_PROFILE)).getSavedEntity();
|
DeviceProfile importedDeviceProfile = (DeviceProfile) importEntity(tenantAdmin2, getAndClone(entitiesExportData, EntityType.DEVICE_PROFILE)).getSavedEntity();
|
||||||
verify(entityActionService).logEntityAction(any(), eq(importedDeviceProfile.getId()), eq(importedDeviceProfile),
|
verify(entityActionService).logEntityAction(any(), eq(importedDeviceProfile.getId()), eq(importedDeviceProfile),
|
||||||
any(), eq(ActionType.ADDED), isNull());
|
any(), eq(ActionType.ADDED), isNull());
|
||||||
verify(tbClusterService).onDeviceProfileChange(eq(importedDeviceProfile), any());
|
verify(tbClusterService).onDeviceProfileChange(eq(importedDeviceProfile), any());
|
||||||
verify(tbClusterService).broadcastEntityStateChangeEvent(any(), eq(importedDeviceProfile.getId()), eq(ComponentLifecycleEvent.CREATED));
|
verify(tbClusterService).broadcastEntityStateChangeEvent(any(), eq(importedDeviceProfile.getId()), eq(ComponentLifecycleEvent.CREATED));
|
||||||
verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedDeviceProfile.getId()), any(), any(), eq(EdgeEventActionType.ADDED));
|
verify(tbClusterService).sendNotificationMsgToEdge(any(), any(), eq(importedDeviceProfile.getId()), any(), any(), eq(EdgeEventActionType.ADDED), any());
|
||||||
verify(otaPackageStateService).update(eq(importedDeviceProfile), eq(false), eq(false));
|
verify(otaPackageStateService).update(eq(importedDeviceProfile), eq(false), eq(false));
|
||||||
|
|
||||||
Device importedDevice = (Device) importEntity(tenantAdmin2, getAndClone(entitiesExportData, EntityType.DEVICE)).getSavedEntity();
|
Device importedDevice = (Device) importEntity(tenantAdmin2, getAndClone(entitiesExportData, EntityType.DEVICE)).getSavedEntity();
|
||||||
|
|||||||
@ -92,6 +92,6 @@ public interface TbClusterService extends TbQueueClusterService {
|
|||||||
|
|
||||||
void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse);
|
void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse);
|
||||||
|
|
||||||
void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action);
|
void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action, EdgeId sourceEdgeId);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -799,6 +799,8 @@ message EdgeNotificationMsgProto {
|
|||||||
string body = 10;
|
string body = 10;
|
||||||
PostTelemetryMsg postTelemetryMsg = 11;
|
PostTelemetryMsg postTelemetryMsg = 11;
|
||||||
PostAttributeMsg postAttributesMsg = 12;
|
PostAttributeMsg postAttributesMsg = 12;
|
||||||
|
int64 sourceEdgeIdMSB = 13;
|
||||||
|
int64 sourceEdgeIdLSB = 14;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@ -15,9 +15,9 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.dao.edge;
|
package org.thingsboard.server.dao.edge;
|
||||||
|
|
||||||
|
import org.thingsboard.server.common.data.id.EdgeId;
|
||||||
|
|
||||||
public interface EdgeSynchronizationManager {
|
public interface EdgeSynchronizationManager {
|
||||||
|
|
||||||
ThreadLocal<Boolean> getSync();
|
ThreadLocal<EdgeId> getEdgeId();
|
||||||
|
|
||||||
boolean isSync();
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,17 +18,12 @@ package org.thingsboard.server.dao.edge;
|
|||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.thingsboard.server.common.data.id.EdgeId;
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class DefaultEdgeSynchronizationManager implements EdgeSynchronizationManager {
|
public class DefaultEdgeSynchronizationManager implements EdgeSynchronizationManager {
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
private final ThreadLocal<Boolean> sync = new ThreadLocal<>();
|
private final ThreadLocal<EdgeId> edgeId = new ThreadLocal<>();
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isSync() {
|
|
||||||
Boolean sync = this.sync.get();
|
|
||||||
return sync != null && sync;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -26,7 +26,6 @@ import org.thingsboard.server.common.data.id.TenantId;
|
|||||||
public class DeleteEntityEvent<T> {
|
public class DeleteEntityEvent<T> {
|
||||||
private final TenantId tenantId;
|
private final TenantId tenantId;
|
||||||
private final EntityId entityId;
|
private final EntityId entityId;
|
||||||
private final EdgeId edgeId;
|
|
||||||
private final T entity;
|
private final T entity;
|
||||||
|
|
||||||
@Builder.Default
|
@Builder.Default
|
||||||
|
|||||||
@ -104,7 +104,7 @@ public class TbSendRPCReplyNode implements TbNode {
|
|||||||
body.put("requestId", requestIdStr);
|
body.put("requestId", requestIdStr);
|
||||||
body.put("response", msg.getData());
|
body.put("response", msg.getData());
|
||||||
EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(ctx.getTenantId(), edgeId, EdgeEventType.DEVICE,
|
EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(ctx.getTenantId(), edgeId, EdgeEventType.DEVICE,
|
||||||
EdgeEventActionType.RPC_CALL, deviceId, JacksonUtil.OBJECT_MAPPER.valueToTree(body));
|
EdgeEventActionType.RPC_CALL, deviceId, JacksonUtil.valueToTree(body));
|
||||||
ListenableFuture<Void> future = ctx.getEdgeEventService().saveAsync(edgeEvent);
|
ListenableFuture<Void> future = ctx.getEdgeEventService().saveAsync(edgeEvent);
|
||||||
Futures.addCallback(future, new FutureCallback<>() {
|
Futures.addCallback(future, new FutureCallback<>() {
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user