Code review. Minor refactoring: Sync -> EdgeId. Logging updates. Avoid usage of OBJECT_MAPPER.
This commit is contained in:
parent
38dabc7f38
commit
919e0c5081
@ -77,7 +77,7 @@ public class EdgeEventSourcingListener {
|
|||||||
log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event);
|
log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event);
|
||||||
EdgeEventActionType action = Boolean.TRUE.equals(event.getAdded()) ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED;
|
EdgeEventActionType action = Boolean.TRUE.equals(event.getAdded()) ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED;
|
||||||
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(),
|
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(),
|
||||||
null, null, action, edgeSynchronizationManager.getEdgeId());
|
null, null, action, edgeSynchronizationManager.getEdgeId().get());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event);
|
log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event);
|
||||||
}
|
}
|
||||||
@ -89,7 +89,7 @@ public class EdgeEventSourcingListener {
|
|||||||
log.trace("[{}] DeleteEntityEvent called: {}", event.getTenantId(), event);
|
log.trace("[{}] DeleteEntityEvent called: {}", event.getTenantId(), event);
|
||||||
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(),
|
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(),
|
||||||
JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED,
|
JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED,
|
||||||
edgeSynchronizationManager.getEdgeId());
|
edgeSynchronizationManager.getEdgeId().get());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event);
|
log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event);
|
||||||
}
|
}
|
||||||
@ -101,7 +101,7 @@ public class EdgeEventSourcingListener {
|
|||||||
log.trace("[{}] ActionEntityEvent called: {}", event.getTenantId(), event);
|
log.trace("[{}] ActionEntityEvent called: {}", event.getTenantId(), event);
|
||||||
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(),
|
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(),
|
||||||
event.getBody(), null, edgeTypeByActionType(event.getActionType()),
|
event.getBody(), null, edgeTypeByActionType(event.getActionType()),
|
||||||
edgeSynchronizationManager.getEdgeId());
|
edgeSynchronizationManager.getEdgeId().get());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event);
|
log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event);
|
||||||
}
|
}
|
||||||
@ -122,7 +122,7 @@ public class EdgeEventSourcingListener {
|
|||||||
log.trace("[{}] RelationActionEvent called: {}", event.getTenantId(), event);
|
log.trace("[{}] RelationActionEvent called: {}", event.getTenantId(), event);
|
||||||
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, null,
|
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, null,
|
||||||
JacksonUtil.toString(relation), EdgeEventType.RELATION, edgeTypeByActionType(event.getActionType()),
|
JacksonUtil.toString(relation), EdgeEventType.RELATION, edgeTypeByActionType(event.getActionType()),
|
||||||
edgeSynchronizationManager.getEdgeId());
|
edgeSynchronizationManager.getEdgeId().get());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event);
|
log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,7 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.edge.rpc.constructor.rule;
|
package org.thingsboard.server.service.edge.rpc.constructor.rule;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
||||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -50,7 +49,7 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM
|
|||||||
constructRuleChainMetadataUpdatedMsg(tenantId, builder, ruleChainMetaData);
|
constructRuleChainMetadataUpdatedMsg(tenantId, builder, ruleChainMetaData);
|
||||||
builder.setMsgType(msgType);
|
builder.setMsgType(msgType);
|
||||||
return builder.build();
|
return builder.build();
|
||||||
} catch (JsonProcessingException ex) {
|
} catch (Exception ex) {
|
||||||
log.error("[{}] Can't construct RuleChainMetadataUpdateMsg", tenantId, ex);
|
log.error("[{}] Can't construct RuleChainMetadataUpdateMsg", tenantId, ex);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
@ -58,7 +57,7 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM
|
|||||||
|
|
||||||
protected abstract void constructRuleChainMetadataUpdatedMsg(TenantId tenantId,
|
protected abstract void constructRuleChainMetadataUpdatedMsg(TenantId tenantId,
|
||||||
RuleChainMetadataUpdateMsg.Builder builder,
|
RuleChainMetadataUpdateMsg.Builder builder,
|
||||||
RuleChainMetaData ruleChainMetaData) throws JsonProcessingException;
|
RuleChainMetaData ruleChainMetaData);
|
||||||
|
|
||||||
protected List<NodeConnectionInfoProto> constructConnections(List<NodeConnectionInfo> connections) {
|
protected List<NodeConnectionInfoProto> constructConnections(List<NodeConnectionInfo> connections) {
|
||||||
List<NodeConnectionInfoProto> result = new ArrayList<>();
|
List<NodeConnectionInfoProto> result = new ArrayList<>();
|
||||||
@ -78,7 +77,7 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<RuleNodeProto> constructNodes(List<RuleNode> nodes) throws JsonProcessingException {
|
protected List<RuleNodeProto> constructNodes(List<RuleNode> nodes) {
|
||||||
List<RuleNodeProto> result = new ArrayList<>();
|
List<RuleNodeProto> result = new ArrayList<>();
|
||||||
if (nodes != null && !nodes.isEmpty()) {
|
if (nodes != null && !nodes.isEmpty()) {
|
||||||
for (RuleNode node : nodes) {
|
for (RuleNode node : nodes) {
|
||||||
@ -88,22 +87,22 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private RuleNodeProto constructNode(RuleNode node) throws JsonProcessingException {
|
private RuleNodeProto constructNode(RuleNode node) {
|
||||||
return RuleNodeProto.newBuilder()
|
return RuleNodeProto.newBuilder()
|
||||||
.setIdMSB(node.getId().getId().getMostSignificantBits())
|
.setIdMSB(node.getId().getId().getMostSignificantBits())
|
||||||
.setIdLSB(node.getId().getId().getLeastSignificantBits())
|
.setIdLSB(node.getId().getId().getLeastSignificantBits())
|
||||||
.setType(node.getType())
|
.setType(node.getType())
|
||||||
.setName(node.getName())
|
.setName(node.getName())
|
||||||
.setDebugMode(node.isDebugMode())
|
.setDebugMode(node.isDebugMode())
|
||||||
.setConfiguration(JacksonUtil.OBJECT_MAPPER.writeValueAsString(node.getConfiguration()))
|
.setConfiguration(JacksonUtil.toString(node.getConfiguration()))
|
||||||
.setAdditionalInfo(JacksonUtil.OBJECT_MAPPER.writeValueAsString(node.getAdditionalInfo()))
|
.setAdditionalInfo(JacksonUtil.toString(node.getAdditionalInfo()))
|
||||||
.setSingletonMode(node.isSingletonMode())
|
.setSingletonMode(node.isSingletonMode())
|
||||||
.setConfigurationVersion(node.getConfigurationVersion())
|
.setConfigurationVersion(node.getConfigurationVersion())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<RuleChainConnectionInfoProto> constructRuleChainConnections(List<RuleChainConnectionInfo> ruleChainConnections,
|
protected List<RuleChainConnectionInfoProto> constructRuleChainConnections(List<RuleChainConnectionInfo> ruleChainConnections,
|
||||||
NavigableSet<Integer> removedNodeIndexes) throws JsonProcessingException {
|
NavigableSet<Integer> removedNodeIndexes) {
|
||||||
List<RuleChainConnectionInfoProto> result = new ArrayList<>();
|
List<RuleChainConnectionInfoProto> result = new ArrayList<>();
|
||||||
if (ruleChainConnections != null && !ruleChainConnections.isEmpty()) {
|
if (ruleChainConnections != null && !ruleChainConnections.isEmpty()) {
|
||||||
for (RuleChainConnectionInfo ruleChainConnectionInfo : ruleChainConnections) {
|
for (RuleChainConnectionInfo ruleChainConnectionInfo : ruleChainConnections) {
|
||||||
@ -127,13 +126,13 @@ public abstract class AbstractRuleChainMetadataConstructor implements RuleChainM
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private RuleChainConnectionInfoProto constructRuleChainConnection(RuleChainConnectionInfo ruleChainConnectionInfo) throws JsonProcessingException {
|
private RuleChainConnectionInfoProto constructRuleChainConnection(RuleChainConnectionInfo ruleChainConnectionInfo) {
|
||||||
return RuleChainConnectionInfoProto.newBuilder()
|
return RuleChainConnectionInfoProto.newBuilder()
|
||||||
.setFromIndex(ruleChainConnectionInfo.getFromIndex())
|
.setFromIndex(ruleChainConnectionInfo.getFromIndex())
|
||||||
.setTargetRuleChainIdMSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getMostSignificantBits())
|
.setTargetRuleChainIdMSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getMostSignificantBits())
|
||||||
.setTargetRuleChainIdLSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getLeastSignificantBits())
|
.setTargetRuleChainIdLSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getLeastSignificantBits())
|
||||||
.setType(ruleChainConnectionInfo.getType())
|
.setType(ruleChainConnectionInfo.getType())
|
||||||
.setAdditionalInfo(JacksonUtil.OBJECT_MAPPER.writeValueAsString(ruleChainConnectionInfo.getAdditionalInfo()))
|
.setAdditionalInfo(JacksonUtil.toString(ruleChainConnectionInfo.getAdditionalInfo()))
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,7 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.edge.rpc.constructor.rule;
|
package org.thingsboard.server.service.edge.rpc.constructor.rule;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.rule.engine.flow.TbRuleChainInputNode;
|
import org.thingsboard.rule.engine.flow.TbRuleChainInputNode;
|
||||||
@ -45,7 +44,7 @@ public class RuleChainMetadataConstructorV330 extends AbstractRuleChainMetadataC
|
|||||||
@Override
|
@Override
|
||||||
protected void constructRuleChainMetadataUpdatedMsg(TenantId tenantId,
|
protected void constructRuleChainMetadataUpdatedMsg(TenantId tenantId,
|
||||||
RuleChainMetadataUpdateMsg.Builder builder,
|
RuleChainMetadataUpdateMsg.Builder builder,
|
||||||
RuleChainMetaData ruleChainMetaData) throws JsonProcessingException {
|
RuleChainMetaData ruleChainMetaData) {
|
||||||
List<RuleNode> supportedNodes = filterNodes(ruleChainMetaData.getNodes());
|
List<RuleNode> supportedNodes = filterNodes(ruleChainMetaData.getNodes());
|
||||||
|
|
||||||
NavigableSet<Integer> removedNodeIndexes = getRemovedNodeIndexes(ruleChainMetaData.getNodes(), ruleChainMetaData.getConnections());
|
NavigableSet<Integer> removedNodeIndexes = getRemovedNodeIndexes(ruleChainMetaData.getNodes(), ruleChainMetaData.getConnections());
|
||||||
|
|||||||
@ -15,7 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.edge.rpc.constructor.rule;
|
package org.thingsboard.server.service.edge.rpc.constructor.rule;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
|
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
|
||||||
@ -29,7 +28,7 @@ public class RuleChainMetadataConstructorV340 extends AbstractRuleChainMetadataC
|
|||||||
@Override
|
@Override
|
||||||
protected void constructRuleChainMetadataUpdatedMsg(TenantId tenantId,
|
protected void constructRuleChainMetadataUpdatedMsg(TenantId tenantId,
|
||||||
RuleChainMetadataUpdateMsg.Builder builder,
|
RuleChainMetadataUpdateMsg.Builder builder,
|
||||||
RuleChainMetaData ruleChainMetaData) throws JsonProcessingException {
|
RuleChainMetaData ruleChainMetaData) {
|
||||||
builder.addAllNodes(constructNodes(ruleChainMetaData.getNodes()))
|
builder.addAllNodes(constructNodes(ruleChainMetaData.getNodes()))
|
||||||
.addAllConnections(constructConnections(ruleChainMetaData.getConnections()))
|
.addAllConnections(constructConnections(ruleChainMetaData.getConnections()))
|
||||||
.addAllRuleChainConnections(constructRuleChainConnections(ruleChainMetaData.getRuleChainConnections(), new TreeSet<>()));
|
.addAllRuleChainConnections(constructRuleChainConnections(ruleChainMetaData.getRuleChainConnections(), new TreeSet<>()));
|
||||||
|
|||||||
@ -79,19 +79,19 @@ public class AdminSettingsEdgeEventFetcher implements EdgeEventFetcher {
|
|||||||
|
|
||||||
AdminSettings systemMailSettings = adminSettingsService.findAdminSettingsByKey(TenantId.SYS_TENANT_ID, "mail");
|
AdminSettings systemMailSettings = adminSettingsService.findAdminSettingsByKey(TenantId.SYS_TENANT_ID, "mail");
|
||||||
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
||||||
EdgeEventActionType.UPDATED, null, JacksonUtil.OBJECT_MAPPER.valueToTree(systemMailSettings)));
|
EdgeEventActionType.UPDATED, null, JacksonUtil.valueToTree(systemMailSettings)));
|
||||||
|
|
||||||
AdminSettings tenantMailSettings = convertToTenantAdminSettings(tenantId, systemMailSettings.getKey(), (ObjectNode) systemMailSettings.getJsonValue());
|
AdminSettings tenantMailSettings = convertToTenantAdminSettings(tenantId, systemMailSettings.getKey(), (ObjectNode) systemMailSettings.getJsonValue());
|
||||||
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
||||||
EdgeEventActionType.UPDATED, null, JacksonUtil.OBJECT_MAPPER.valueToTree(tenantMailSettings)));
|
EdgeEventActionType.UPDATED, null, JacksonUtil.valueToTree(tenantMailSettings)));
|
||||||
|
|
||||||
AdminSettings systemMailTemplates = loadMailTemplates(tenantId);
|
AdminSettings systemMailTemplates = loadMailTemplates(tenantId);
|
||||||
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
||||||
EdgeEventActionType.UPDATED, null, JacksonUtil.OBJECT_MAPPER.valueToTree(systemMailTemplates)));
|
EdgeEventActionType.UPDATED, null, JacksonUtil.valueToTree(systemMailTemplates)));
|
||||||
|
|
||||||
AdminSettings tenantMailTemplates = convertToTenantAdminSettings(tenantId, systemMailTemplates.getKey(), (ObjectNode) systemMailTemplates.getJsonValue());
|
AdminSettings tenantMailTemplates = convertToTenantAdminSettings(tenantId, systemMailTemplates.getKey(), (ObjectNode) systemMailTemplates.getJsonValue());
|
||||||
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
result.add(EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS,
|
||||||
EdgeEventActionType.UPDATED, null, JacksonUtil.OBJECT_MAPPER.valueToTree(tenantMailTemplates)));
|
EdgeEventActionType.UPDATED, null, JacksonUtil.valueToTree(tenantMailTemplates)));
|
||||||
|
|
||||||
// return PageData object to be in sync with other fetchers
|
// return PageData object to be in sync with other fetchers
|
||||||
return new PageData<>(result, 1, result.size(), false);
|
return new PageData<>(result, 1, result.size(), false);
|
||||||
@ -114,7 +114,7 @@ public class AdminSettingsEdgeEventFetcher implements EdgeEventFetcher {
|
|||||||
AdminSettings adminSettings = new AdminSettings();
|
AdminSettings adminSettings = new AdminSettings();
|
||||||
adminSettings.setId(new AdminSettingsId(Uuids.timeBased()));
|
adminSettings.setId(new AdminSettingsId(Uuids.timeBased()));
|
||||||
adminSettings.setKey("mailTemplates");
|
adminSettings.setKey("mailTemplates");
|
||||||
adminSettings.setJsonValue(JacksonUtil.OBJECT_MAPPER.convertValue(mailTemplates, JsonNode.class));
|
adminSettings.setJsonValue(JacksonUtil.convertValue(mailTemplates, JsonNode.class));
|
||||||
return adminSettings;
|
return adminSettings;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -431,6 +431,8 @@ public abstract class BaseEdgeProcessor {
|
|||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
}, dbCallbackExecutorService);
|
}, dbCallbackExecutorService);
|
||||||
|
} else {
|
||||||
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
|
|||||||
@ -50,10 +50,10 @@ public class AlarmEdgeProcessor extends BaseAlarmProcessor {
|
|||||||
public ListenableFuture<Void> processAlarmMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmUpdateMsg alarmUpdateMsg) {
|
public ListenableFuture<Void> processAlarmMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmUpdateMsg alarmUpdateMsg) {
|
||||||
log.trace("[{}] processAlarmMsgFromEdge [{}]", tenantId, alarmUpdateMsg);
|
log.trace("[{}] processAlarmMsgFromEdge [{}]", tenantId, alarmUpdateMsg);
|
||||||
try {
|
try {
|
||||||
edgeSynchronizationManager.getSync().set(edgeId);
|
edgeSynchronizationManager.getEdgeId().set(edgeId);
|
||||||
return processAlarmMsg(tenantId, alarmUpdateMsg);
|
return processAlarmMsg(tenantId, alarmUpdateMsg);
|
||||||
} finally {
|
} finally {
|
||||||
edgeSynchronizationManager.getSync().remove();
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -77,7 +77,7 @@ public class AlarmEdgeProcessor extends BaseAlarmProcessor {
|
|||||||
case DELETED:
|
case DELETED:
|
||||||
Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), Alarm.class);
|
Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), Alarm.class);
|
||||||
List<ListenableFuture<Void>> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(),
|
List<ListenableFuture<Void>> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(),
|
||||||
alarmId, actionType, JacksonUtil.OBJECT_MAPPER.valueToTree(deletedAlarm), sourceEdgeId);
|
alarmId, actionType, JacksonUtil.valueToTree(deletedAlarm), sourceEdgeId);
|
||||||
return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService);
|
return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService);
|
||||||
default:
|
default:
|
||||||
ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId);
|
ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId);
|
||||||
|
|||||||
@ -123,7 +123,7 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor {
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case DELETED:
|
case DELETED:
|
||||||
Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.convertValue(body, Alarm.class);
|
Alarm deletedAlarm = JacksonUtil.convertValue(body, Alarm.class);
|
||||||
return alarmMsgConstructor.constructAlarmUpdatedMsg(msgType, deletedAlarm, findOriginatorEntityName(tenantId, deletedAlarm));
|
return alarmMsgConstructor.constructAlarmUpdatedMsg(msgType, deletedAlarm, findOriginatorEntityName(tenantId, deletedAlarm));
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
|||||||
@ -50,10 +50,10 @@ import java.util.UUID;
|
|||||||
public class AssetEdgeProcessor extends BaseAssetProcessor {
|
public class AssetEdgeProcessor extends BaseAssetProcessor {
|
||||||
|
|
||||||
public ListenableFuture<Void> processAssetMsgFromEdge(TenantId tenantId, Edge edge, AssetUpdateMsg assetUpdateMsg) {
|
public ListenableFuture<Void> processAssetMsgFromEdge(TenantId tenantId, Edge edge, AssetUpdateMsg assetUpdateMsg) {
|
||||||
log.trace("[{}] executing processAssetMsgFromEdge [{}] from edge [{}]", tenantId, assetUpdateMsg, edge.getName());
|
log.trace("[{}] executing processAssetMsgFromEdge [{}] from edge [{}]", tenantId, assetUpdateMsg, edge.getId());
|
||||||
AssetId assetId = new AssetId(new UUID(assetUpdateMsg.getIdMSB(), assetUpdateMsg.getIdLSB()));
|
AssetId assetId = new AssetId(new UUID(assetUpdateMsg.getIdMSB(), assetUpdateMsg.getIdLSB()));
|
||||||
try {
|
try {
|
||||||
edgeSynchronizationManager.getSync().set(edge.getId());
|
edgeSynchronizationManager.getEdgeId().set(edge.getId());
|
||||||
|
|
||||||
switch (assetUpdateMsg.getMsgType()) {
|
switch (assetUpdateMsg.getMsgType()) {
|
||||||
case ENTITY_CREATED_RPC_MESSAGE:
|
case ENTITY_CREATED_RPC_MESSAGE:
|
||||||
@ -78,7 +78,7 @@ public class AssetEdgeProcessor extends BaseAssetProcessor {
|
|||||||
return Futures.immediateFailedFuture(e);
|
return Futures.immediateFailedFuture(e);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
edgeSynchronizationManager.getSync().remove();
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -49,10 +49,10 @@ import java.util.UUID;
|
|||||||
public class AssetProfileEdgeProcessor extends BaseAssetProfileProcessor {
|
public class AssetProfileEdgeProcessor extends BaseAssetProfileProcessor {
|
||||||
|
|
||||||
public ListenableFuture<Void> processAssetProfileMsgFromEdge(TenantId tenantId, Edge edge, AssetProfileUpdateMsg assetProfileUpdateMsg) {
|
public ListenableFuture<Void> processAssetProfileMsgFromEdge(TenantId tenantId, Edge edge, AssetProfileUpdateMsg assetProfileUpdateMsg) {
|
||||||
log.trace("[{}] executing processAssetProfileMsgFromEdge [{}] from edge [{}]", tenantId, assetProfileUpdateMsg, edge.getName());
|
log.trace("[{}] executing processAssetProfileMsgFromEdge [{}] from edge [{}]", tenantId, assetProfileUpdateMsg, edge.getId());
|
||||||
AssetProfileId assetProfileId = new AssetProfileId(new UUID(assetProfileUpdateMsg.getIdMSB(), assetProfileUpdateMsg.getIdLSB()));
|
AssetProfileId assetProfileId = new AssetProfileId(new UUID(assetProfileUpdateMsg.getIdMSB(), assetProfileUpdateMsg.getIdLSB()));
|
||||||
try {
|
try {
|
||||||
edgeSynchronizationManager.getSync().set(edge.getId());
|
edgeSynchronizationManager.getEdgeId().set(edge.getId());
|
||||||
|
|
||||||
switch (assetProfileUpdateMsg.getMsgType()) {
|
switch (assetProfileUpdateMsg.getMsgType()) {
|
||||||
case ENTITY_CREATED_RPC_MESSAGE:
|
case ENTITY_CREATED_RPC_MESSAGE:
|
||||||
@ -68,7 +68,7 @@ public class AssetProfileEdgeProcessor extends BaseAssetProfileProcessor {
|
|||||||
log.warn("[{}] Failed to process AssetProfileUpdateMsg from Edge [{}]", tenantId, assetProfileUpdateMsg, e);
|
log.warn("[{}] Failed to process AssetProfileUpdateMsg from Edge [{}]", tenantId, assetProfileUpdateMsg, e);
|
||||||
return Futures.immediateFailedFuture(e);
|
return Futures.immediateFailedFuture(e);
|
||||||
} finally {
|
} finally {
|
||||||
edgeSynchronizationManager.getSync().remove();
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -45,10 +45,10 @@ import java.util.UUID;
|
|||||||
public class DashboardEdgeProcessor extends BaseDashboardProcessor {
|
public class DashboardEdgeProcessor extends BaseDashboardProcessor {
|
||||||
|
|
||||||
public ListenableFuture<Void> processDashboardMsgFromEdge(TenantId tenantId, Edge edge, DashboardUpdateMsg dashboardUpdateMsg) {
|
public ListenableFuture<Void> processDashboardMsgFromEdge(TenantId tenantId, Edge edge, DashboardUpdateMsg dashboardUpdateMsg) {
|
||||||
log.trace("[{}] executing processDashboardMsgFromEdge [{}] from edge [{}]", tenantId, dashboardUpdateMsg, edge.getName());
|
log.trace("[{}] executing processDashboardMsgFromEdge [{}] from edge [{}]", tenantId, dashboardUpdateMsg, edge.getId());
|
||||||
DashboardId dashboardId = new DashboardId(new UUID(dashboardUpdateMsg.getIdMSB(), dashboardUpdateMsg.getIdLSB()));
|
DashboardId dashboardId = new DashboardId(new UUID(dashboardUpdateMsg.getIdMSB(), dashboardUpdateMsg.getIdLSB()));
|
||||||
try {
|
try {
|
||||||
edgeSynchronizationManager.getSync().set(edge.getId());
|
edgeSynchronizationManager.getEdgeId().set(edge.getId());
|
||||||
|
|
||||||
switch (dashboardUpdateMsg.getMsgType()) {
|
switch (dashboardUpdateMsg.getMsgType()) {
|
||||||
case ENTITY_CREATED_RPC_MESSAGE:
|
case ENTITY_CREATED_RPC_MESSAGE:
|
||||||
@ -73,7 +73,7 @@ public class DashboardEdgeProcessor extends BaseDashboardProcessor {
|
|||||||
return Futures.immediateFailedFuture(e);
|
return Futures.immediateFailedFuture(e);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
edgeSynchronizationManager.getSync().remove();
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -66,10 +66,10 @@ import java.util.UUID;
|
|||||||
public class DeviceEdgeProcessor extends BaseDeviceProcessor {
|
public class DeviceEdgeProcessor extends BaseDeviceProcessor {
|
||||||
|
|
||||||
public ListenableFuture<Void> processDeviceMsgFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
|
public ListenableFuture<Void> processDeviceMsgFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
|
||||||
log.trace("[{}] executing processDeviceMsgFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName());
|
log.trace("[{}] executing processDeviceMsgFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getId());
|
||||||
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
|
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
|
||||||
try {
|
try {
|
||||||
edgeSynchronizationManager.getSync().set(edge.getId());
|
edgeSynchronizationManager.getEdgeId().set(edge.getId());
|
||||||
|
|
||||||
switch (deviceUpdateMsg.getMsgType()) {
|
switch (deviceUpdateMsg.getMsgType()) {
|
||||||
case ENTITY_CREATED_RPC_MESSAGE:
|
case ENTITY_CREATED_RPC_MESSAGE:
|
||||||
@ -94,18 +94,18 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor {
|
|||||||
return Futures.immediateFailedFuture(e);
|
return Futures.immediateFailedFuture(e);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
edgeSynchronizationManager.getSync().remove();
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public ListenableFuture<Void> processDeviceCredentialsMsgFromEdge(TenantId tenantId, EdgeId edgeId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) {
|
public ListenableFuture<Void> processDeviceCredentialsMsgFromEdge(TenantId tenantId, EdgeId edgeId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) {
|
||||||
log.debug("[{}] Executing processDeviceCredentialsMsgFromEdge, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg);
|
log.debug("[{}] Executing processDeviceCredentialsMsgFromEdge, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg);
|
||||||
try {
|
try {
|
||||||
edgeSynchronizationManager.getSync().set(edgeId);
|
edgeSynchronizationManager.getEdgeId().set(edgeId);
|
||||||
|
|
||||||
updateDeviceCredentials(tenantId, deviceCredentialsUpdateMsg);
|
updateDeviceCredentials(tenantId, deviceCredentialsUpdateMsg);
|
||||||
} finally {
|
} finally {
|
||||||
edgeSynchronizationManager.getSync().remove();
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
}
|
}
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -49,10 +49,10 @@ import java.util.UUID;
|
|||||||
public class DeviceProfileEdgeProcessor extends BaseDeviceProfileProcessor {
|
public class DeviceProfileEdgeProcessor extends BaseDeviceProfileProcessor {
|
||||||
|
|
||||||
public ListenableFuture<Void> processDeviceProfileMsgFromEdge(TenantId tenantId, Edge edge, DeviceProfileUpdateMsg deviceProfileUpdateMsg) {
|
public ListenableFuture<Void> processDeviceProfileMsgFromEdge(TenantId tenantId, Edge edge, DeviceProfileUpdateMsg deviceProfileUpdateMsg) {
|
||||||
log.trace("[{}] executing processDeviceProfileMsgFromEdge [{}] from edge [{}]", tenantId, deviceProfileUpdateMsg, edge.getName());
|
log.trace("[{}] executing processDeviceProfileMsgFromEdge [{}] from edge [{}]", tenantId, deviceProfileUpdateMsg, edge.getId());
|
||||||
DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileUpdateMsg.getIdMSB(), deviceProfileUpdateMsg.getIdLSB()));
|
DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileUpdateMsg.getIdMSB(), deviceProfileUpdateMsg.getIdLSB()));
|
||||||
try {
|
try {
|
||||||
edgeSynchronizationManager.getSync().set(edge.getId());
|
edgeSynchronizationManager.getEdgeId().set(edge.getId());
|
||||||
|
|
||||||
switch (deviceProfileUpdateMsg.getMsgType()) {
|
switch (deviceProfileUpdateMsg.getMsgType()) {
|
||||||
case ENTITY_CREATED_RPC_MESSAGE:
|
case ENTITY_CREATED_RPC_MESSAGE:
|
||||||
@ -68,7 +68,7 @@ public class DeviceProfileEdgeProcessor extends BaseDeviceProfileProcessor {
|
|||||||
log.warn("[{}] Failed to process DeviceProfileUpdateMsg from Edge [{}]", tenantId, deviceProfileUpdateMsg, e);
|
log.warn("[{}] Failed to process DeviceProfileUpdateMsg from Edge [{}]", tenantId, deviceProfileUpdateMsg, e);
|
||||||
return Futures.immediateFailedFuture(e);
|
return Futures.immediateFailedFuture(e);
|
||||||
} finally {
|
} finally {
|
||||||
edgeSynchronizationManager.getSync().remove();
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -46,10 +46,10 @@ import java.util.UUID;
|
|||||||
public class EntityViewEdgeProcessor extends BaseEntityViewProcessor {
|
public class EntityViewEdgeProcessor extends BaseEntityViewProcessor {
|
||||||
|
|
||||||
public ListenableFuture<Void> processEntityViewMsgFromEdge(TenantId tenantId, Edge edge, EntityViewUpdateMsg entityViewUpdateMsg) {
|
public ListenableFuture<Void> processEntityViewMsgFromEdge(TenantId tenantId, Edge edge, EntityViewUpdateMsg entityViewUpdateMsg) {
|
||||||
log.trace("[{}] executing processEntityViewMsgFromEdge [{}] from edge [{}]", tenantId, entityViewUpdateMsg, edge.getName());
|
log.trace("[{}] executing processEntityViewMsgFromEdge [{}] from edge [{}]", tenantId, entityViewUpdateMsg, edge.getId());
|
||||||
EntityViewId entityViewId = new EntityViewId(new UUID(entityViewUpdateMsg.getIdMSB(), entityViewUpdateMsg.getIdLSB()));
|
EntityViewId entityViewId = new EntityViewId(new UUID(entityViewUpdateMsg.getIdMSB(), entityViewUpdateMsg.getIdLSB()));
|
||||||
try {
|
try {
|
||||||
edgeSynchronizationManager.getSync().set(edge.getId());
|
edgeSynchronizationManager.getEdgeId().set(edge.getId());
|
||||||
|
|
||||||
switch (entityViewUpdateMsg.getMsgType()) {
|
switch (entityViewUpdateMsg.getMsgType()) {
|
||||||
case ENTITY_CREATED_RPC_MESSAGE:
|
case ENTITY_CREATED_RPC_MESSAGE:
|
||||||
@ -74,7 +74,7 @@ public class EntityViewEdgeProcessor extends BaseEntityViewProcessor {
|
|||||||
return Futures.immediateFailedFuture(e);
|
return Futures.immediateFailedFuture(e);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
edgeSynchronizationManager.getSync().remove();
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -47,18 +47,18 @@ import java.util.Set;
|
|||||||
public class RelationEdgeProcessor extends BaseRelationProcessor {
|
public class RelationEdgeProcessor extends BaseRelationProcessor {
|
||||||
|
|
||||||
public ListenableFuture<Void> processRelationMsgFromEdge(TenantId tenantId, Edge edge, RelationUpdateMsg relationUpdateMsg) {
|
public ListenableFuture<Void> processRelationMsgFromEdge(TenantId tenantId, Edge edge, RelationUpdateMsg relationUpdateMsg) {
|
||||||
log.trace("[{}] executing processRelationMsgFromEdge [{}] from edge [{}]", tenantId, relationUpdateMsg, edge.getName());
|
log.trace("[{}] executing processRelationMsgFromEdge [{}] from edge [{}]", tenantId, relationUpdateMsg, edge.getId());
|
||||||
try {
|
try {
|
||||||
edgeSynchronizationManager.getSync().set(edge.getId());
|
edgeSynchronizationManager.getEdgeId().set(edge.getId());
|
||||||
|
|
||||||
return processRelationMsg(tenantId, relationUpdateMsg);
|
return processRelationMsg(tenantId, relationUpdateMsg);
|
||||||
} finally {
|
} finally {
|
||||||
edgeSynchronizationManager.getSync().remove();
|
edgeSynchronizationManager.getEdgeId().remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public DownlinkMsg convertRelationEventToDownlink(EdgeEvent edgeEvent) {
|
public DownlinkMsg convertRelationEventToDownlink(EdgeEvent edgeEvent) {
|
||||||
EntityRelation entityRelation = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), EntityRelation.class);
|
EntityRelation entityRelation = JacksonUtil.convertValue(edgeEvent.getBody(), EntityRelation.class);
|
||||||
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
|
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
|
||||||
RelationUpdateMsg relationUpdateMsg = relationMsgConstructor.constructRelationUpdatedMsg(msgType, entityRelation);
|
RelationUpdateMsg relationUpdateMsg = relationMsgConstructor.constructRelationUpdatedMsg(msgType, entityRelation);
|
||||||
return DownlinkMsg.newBuilder()
|
return DownlinkMsg.newBuilder()
|
||||||
@ -87,7 +87,7 @@ public class RelationEdgeProcessor extends BaseRelationProcessor {
|
|||||||
EdgeEventType.RELATION,
|
EdgeEventType.RELATION,
|
||||||
EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()),
|
EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()),
|
||||||
null,
|
null,
|
||||||
JacksonUtil.OBJECT_MAPPER.valueToTree(relation)));
|
JacksonUtil.valueToTree(relation)));
|
||||||
}
|
}
|
||||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -32,7 +32,10 @@ import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
|||||||
public class AdminSettingsEdgeProcessor extends BaseEdgeProcessor {
|
public class AdminSettingsEdgeProcessor extends BaseEdgeProcessor {
|
||||||
|
|
||||||
public DownlinkMsg convertAdminSettingsEventToDownlink(EdgeEvent edgeEvent) {
|
public DownlinkMsg convertAdminSettingsEventToDownlink(EdgeEvent edgeEvent) {
|
||||||
AdminSettings adminSettings = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), AdminSettings.class);
|
AdminSettings adminSettings = JacksonUtil.convertValue(edgeEvent.getBody(), AdminSettings.class);
|
||||||
|
if (adminSettings == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
AdminSettingsUpdateMsg adminSettingsUpdateMsg = adminSettingsMsgConstructor.constructAdminSettingsUpdateMsg(adminSettings);
|
AdminSettingsUpdateMsg adminSettingsUpdateMsg = adminSettingsMsgConstructor.constructAdminSettingsUpdateMsg(adminSettings);
|
||||||
return DownlinkMsg.newBuilder()
|
return DownlinkMsg.newBuilder()
|
||||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
||||||
|
|||||||
@ -176,7 +176,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
|
|||||||
if (attributes.size() > 0) {
|
if (attributes.size() > 0) {
|
||||||
entityData.put("kv", attributes);
|
entityData.put("kv", attributes);
|
||||||
entityData.put("scope", scope);
|
entityData.put("scope", scope);
|
||||||
JsonNode body = JacksonUtil.OBJECT_MAPPER.valueToTree(entityData);
|
JsonNode body = JacksonUtil.valueToTree(entityData);
|
||||||
log.debug("[{}] Sending attributes data msg, entityId [{}], attributes [{}]", tenantId, entityId, body);
|
log.debug("[{}] Sending attributes data msg, entityId [{}], attributes [{}]", tenantId, entityId, body);
|
||||||
future = saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body);
|
future = saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body);
|
||||||
} else {
|
} else {
|
||||||
@ -249,7 +249,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
|
|||||||
EdgeEventType.RELATION,
|
EdgeEventType.RELATION,
|
||||||
EdgeEventActionType.ADDED,
|
EdgeEventActionType.ADDED,
|
||||||
null,
|
null,
|
||||||
JacksonUtil.OBJECT_MAPPER.valueToTree(relation)));
|
JacksonUtil.valueToTree(relation)));
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String errMsg = String.format("[%s][%s] Exception during loading relation [%s] to edge on sync!", tenantId, edge.getId(), relation);
|
String errMsg = String.format("[%s][%s] Exception during loading relation [%s] to edge on sync!", tenantId, edge.getId(), relation);
|
||||||
|
|||||||
@ -19,7 +19,5 @@ import org.thingsboard.server.common.data.id.EdgeId;
|
|||||||
|
|
||||||
public interface EdgeSynchronizationManager {
|
public interface EdgeSynchronizationManager {
|
||||||
|
|
||||||
ThreadLocal<EdgeId> getSync();
|
ThreadLocal<EdgeId> getEdgeId();
|
||||||
|
|
||||||
EdgeId getEdgeId();
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -25,10 +25,5 @@ import org.thingsboard.server.common.data.id.EdgeId;
|
|||||||
public class DefaultEdgeSynchronizationManager implements EdgeSynchronizationManager {
|
public class DefaultEdgeSynchronizationManager implements EdgeSynchronizationManager {
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
private final ThreadLocal<EdgeId> sync = new ThreadLocal<>();
|
private final ThreadLocal<EdgeId> edgeId = new ThreadLocal<>();
|
||||||
|
|
||||||
@Override
|
|
||||||
public EdgeId getEdgeId() {
|
|
||||||
return this.sync.get();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -104,7 +104,7 @@ public class TbSendRPCReplyNode implements TbNode {
|
|||||||
body.put("requestId", requestIdStr);
|
body.put("requestId", requestIdStr);
|
||||||
body.put("response", msg.getData());
|
body.put("response", msg.getData());
|
||||||
EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(ctx.getTenantId(), edgeId, EdgeEventType.DEVICE,
|
EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(ctx.getTenantId(), edgeId, EdgeEventType.DEVICE,
|
||||||
EdgeEventActionType.RPC_CALL, deviceId, JacksonUtil.OBJECT_MAPPER.valueToTree(body));
|
EdgeEventActionType.RPC_CALL, deviceId, JacksonUtil.valueToTree(body));
|
||||||
ListenableFuture<Void> future = ctx.getEdgeEventService().saveAsync(edgeEvent);
|
ListenableFuture<Void> future = ctx.getEdgeEventService().saveAsync(edgeEvent);
|
||||||
Futures.addCallback(future, new FutureCallback<>() {
|
Futures.addCallback(future, new FutureCallback<>() {
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user