Added proper handling of edge event save methods - clients aware of failures
This commit is contained in:
parent
d3eda0c473
commit
bb588ac238
@ -107,6 +107,7 @@ import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -200,8 +201,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
boolean sent = false;
|
||||
if (systemContext.isEdgesEnabled() && edgeId != null) {
|
||||
log.debug("[{}][{}] device is related to edge [{}]. Saving RPC request to edge queue", tenantId, deviceId, edgeId.getId());
|
||||
saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId());
|
||||
try {
|
||||
saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId()).get();
|
||||
sent = true;
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
String errMsg = String.format("[%s][%s][%s] Failed to save rpc request to edge queue %s", tenantId, deviceId, edgeId.getId(), request);
|
||||
log.error(errMsg, e);
|
||||
}
|
||||
} else if (isSendNewRpcAvailable()) {
|
||||
sent = rpcSubscriptions.size() > 0;
|
||||
Set<UUID> syncSessionSet = new HashSet<>();
|
||||
@ -810,7 +816,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
systemContext.getTbCoreToTransportService().process(nodeId, msg);
|
||||
}
|
||||
|
||||
private void saveRpcRequestToEdgeQueue(ToDeviceRpcRequest msg, Integer requestId) {
|
||||
private ListenableFuture<Void> saveRpcRequestToEdgeQueue(ToDeviceRpcRequest msg, Integer requestId) {
|
||||
ObjectNode body = mapper.createObjectNode();
|
||||
body.put("requestId", requestId);
|
||||
body.put("requestUUID", msg.getId().toString());
|
||||
@ -821,17 +827,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
|
||||
EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.DEVICE, EdgeEventActionType.RPC_CALL, deviceId, body);
|
||||
|
||||
Futures.addCallback(systemContext.getEdgeEventService().saveAsync(edgeEvent), new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(Void unused) {
|
||||
return Futures.transform(systemContext.getEdgeEventService().saveAsync(edgeEvent), unused -> {
|
||||
systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
String errMsg = String.format("Failed to save edge event. msg [%s], edge event [%s]", msg, edgeEvent);
|
||||
log.warn(errMsg, t);
|
||||
}
|
||||
return null;
|
||||
}, systemContext.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
|
||||
@ -180,7 +180,7 @@ public class EdgeController extends BaseController {
|
||||
}
|
||||
}
|
||||
|
||||
private void onEdgeCreatedOrUpdated(TenantId tenantId, Edge edge, RuleChain edgeTemplateRootRuleChain, boolean updated, SecurityUser user) throws IOException, ThingsboardException {
|
||||
private void onEdgeCreatedOrUpdated(TenantId tenantId, Edge edge, RuleChain edgeTemplateRootRuleChain, boolean updated, SecurityUser user) throws Exception {
|
||||
if (!updated) {
|
||||
ruleChainService.assignRuleChainToEdge(tenantId, edgeTemplateRootRuleChain.getId(), edge.getId());
|
||||
edgeNotificationService.setEdgeRootRuleChain(tenantId, edge, edgeTemplateRootRuleChain.getId());
|
||||
|
||||
@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@ -46,7 +47,6 @@ import org.thingsboard.server.service.edge.rpc.processor.RelationEdgeProcessor;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
@ -95,14 +95,14 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException {
|
||||
public Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws Exception {
|
||||
edge.setRootRuleChainId(ruleChainId);
|
||||
Edge savedEdge = edgeService.saveEdge(edge);
|
||||
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN, EdgeEventActionType.UPDATED, ruleChainId, null);
|
||||
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN, EdgeEventActionType.UPDATED, ruleChainId, null).get();
|
||||
return savedEdge;
|
||||
}
|
||||
|
||||
private void saveEdgeEvent(TenantId tenantId,
|
||||
private ListenableFuture<Void> saveEdgeEvent(TenantId tenantId,
|
||||
EdgeId edgeId,
|
||||
EdgeEventType type,
|
||||
EdgeEventActionType action,
|
||||
@ -113,17 +113,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
|
||||
|
||||
EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, type, action, entityId, body);
|
||||
|
||||
Futures.addCallback(edgeEventService.saveAsync(edgeEvent), new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Void unused) {
|
||||
return Futures.transform(edgeEventService.saveAsync(edgeEvent), unused -> {
|
||||
clusterService.onEdgeEventUpdate(tenantId, edgeId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
String errMsg = String.format("Failed to save edge event. edge event [%s]", edgeEvent);
|
||||
log.warn(errMsg, t);
|
||||
}
|
||||
return null;
|
||||
}, dbCallBackExecutor);
|
||||
}
|
||||
|
||||
@ -133,9 +125,10 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
|
||||
try {
|
||||
TenantId tenantId = TenantId.fromUUID(new UUID(edgeNotificationMsg.getTenantIdMSB(), edgeNotificationMsg.getTenantIdLSB()));
|
||||
EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType());
|
||||
ListenableFuture<Void> future;
|
||||
switch (type) {
|
||||
case EDGE:
|
||||
edgeProcessor.processEdgeNotification(tenantId, edgeNotificationMsg);
|
||||
future = edgeProcessor.processEdgeNotification(tenantId, edgeNotificationMsg);
|
||||
break;
|
||||
case USER:
|
||||
case ASSET:
|
||||
@ -144,31 +137,45 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
|
||||
case ENTITY_VIEW:
|
||||
case DASHBOARD:
|
||||
case RULE_CHAIN:
|
||||
entityProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
|
||||
future = entityProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
|
||||
break;
|
||||
case CUSTOMER:
|
||||
customerProcessor.processCustomerNotification(tenantId, edgeNotificationMsg);
|
||||
future = customerProcessor.processCustomerNotification(tenantId, edgeNotificationMsg);
|
||||
break;
|
||||
case WIDGETS_BUNDLE:
|
||||
case WIDGET_TYPE:
|
||||
entityProcessor.processEntityNotificationForAllEdges(tenantId, edgeNotificationMsg);
|
||||
future = entityProcessor.processEntityNotificationForAllEdges(tenantId, edgeNotificationMsg);
|
||||
break;
|
||||
case ALARM:
|
||||
alarmProcessor.processAlarmNotification(tenantId, edgeNotificationMsg);
|
||||
future = alarmProcessor.processAlarmNotification(tenantId, edgeNotificationMsg);
|
||||
break;
|
||||
case RELATION:
|
||||
relationProcessor.processRelationNotification(tenantId, edgeNotificationMsg);
|
||||
future = relationProcessor.processRelationNotification(tenantId, edgeNotificationMsg);
|
||||
break;
|
||||
default:
|
||||
log.warn("Edge event type [{}] is not designed to be pushed to edge", type);
|
||||
future = Futures.immediateFuture(null);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
callback.onFailure(e);
|
||||
String errMsg = String.format("Can't push to edge updates, edgeNotificationMsg [%s]", edgeNotificationMsg);
|
||||
log.error(errMsg, e);
|
||||
} finally {
|
||||
Futures.addCallback(future, new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Void unused) {
|
||||
callback.onSuccess();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable throwable) {
|
||||
callBackFailure(edgeNotificationMsg, callback, throwable);
|
||||
}
|
||||
}, dbCallBackExecutor);
|
||||
} catch (Exception e) {
|
||||
callBackFailure(edgeNotificationMsg, callback, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void callBackFailure(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback, Throwable throwable) {
|
||||
String errMsg = String.format("Can't push to edge updates, edgeNotificationMsg [%s]", edgeNotificationMsg);
|
||||
log.error(errMsg, throwable);
|
||||
callback.onFailure(throwable);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -21,11 +21,9 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public interface EdgeNotificationService {
|
||||
|
||||
Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException;
|
||||
Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws Exception;
|
||||
|
||||
void pushNotificationToEdge(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback);
|
||||
}
|
||||
|
||||
@ -16,11 +16,9 @@
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
@ -43,6 +41,8 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@Component
|
||||
@ -148,49 +148,44 @@ public class AlarmEdgeProcessor extends BaseEdgeProcessor {
|
||||
return downlinkMsg;
|
||||
}
|
||||
|
||||
public void processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException {
|
||||
public ListenableFuture<Void> processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException {
|
||||
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
|
||||
AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
|
||||
switch (actionType) {
|
||||
case DELETED:
|
||||
EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB()));
|
||||
Alarm alarm = mapper.readValue(edgeNotificationMsg.getBody(), Alarm.class);
|
||||
saveEdgeEvent(tenantId, edgeId, EdgeEventType.ALARM, actionType, alarmId, mapper.valueToTree(alarm));
|
||||
break;
|
||||
Alarm deletedAlarm = mapper.readValue(edgeNotificationMsg.getBody(), Alarm.class);
|
||||
return saveEdgeEvent(tenantId, edgeId, EdgeEventType.ALARM, actionType, alarmId, mapper.valueToTree(deletedAlarm));
|
||||
default:
|
||||
ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId);
|
||||
Futures.addCallback(alarmFuture, new FutureCallback<Alarm>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Alarm alarm) {
|
||||
if (alarm != null) {
|
||||
return Futures.transformAsync(alarmFuture, alarm -> {
|
||||
if (alarm == null) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType());
|
||||
if (type != null) {
|
||||
if (type == null) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
||||
PageData<EdgeId> pageData;
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
do {
|
||||
pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator(), pageLink);
|
||||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
||||
for (EdgeId edgeId : pageData.getData()) {
|
||||
saveEdgeEvent(tenantId,
|
||||
edgeId,
|
||||
for (EdgeId relatedEdgeId : pageData.getData()) {
|
||||
futures.add(saveEdgeEvent(tenantId,
|
||||
relatedEdgeId,
|
||||
EdgeEventType.ALARM,
|
||||
EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()),
|
||||
alarmId,
|
||||
null);
|
||||
null));
|
||||
}
|
||||
if (pageData.hasNext()) {
|
||||
pageLink = pageLink.nextPageLink();
|
||||
}
|
||||
}
|
||||
} while (pageData != null && pageData.hasNext());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
log.warn("[{}] can't find alarm by id [{}] {}", tenantId.getId(), alarmId.getId(), t);
|
||||
}
|
||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||
}, dbCallbackExecutorService);
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,10 +17,9 @@ package org.thingsboard.server.service.edge.rpc.processor;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.thingsboard.server.cluster.TbClusterService;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
@ -71,6 +70,9 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
import org.thingsboard.server.service.state.DeviceStateService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
public abstract class BaseEdgeProcessor {
|
||||
|
||||
@ -183,7 +185,7 @@ public abstract class BaseEdgeProcessor {
|
||||
@Autowired
|
||||
protected DbCallbackExecutorService dbCallbackExecutorService;
|
||||
|
||||
protected void saveEdgeEvent(TenantId tenantId,
|
||||
protected ListenableFuture<Void> saveEdgeEvent(TenantId tenantId,
|
||||
EdgeId edgeId,
|
||||
EdgeEventType type,
|
||||
EdgeEventActionType action,
|
||||
@ -195,17 +197,9 @@ public abstract class BaseEdgeProcessor {
|
||||
|
||||
EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, type, action, entityId, body);
|
||||
|
||||
Futures.addCallback(edgeEventService.saveAsync(edgeEvent), new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Void unused) {
|
||||
return Futures.transform(edgeEventService.saveAsync(edgeEvent), unused -> {
|
||||
tbClusterService.onEdgeEventUpdate(tenantId, edgeId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
String errMsg = String.format("Failed to save edge event. edge event [%s]", edgeEvent);
|
||||
log.warn(errMsg, t);
|
||||
}
|
||||
return null;
|
||||
}, dbCallbackExecutorService);
|
||||
}
|
||||
|
||||
@ -217,19 +211,21 @@ public abstract class BaseEdgeProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
protected void processActionForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId) {
|
||||
protected ListenableFuture<Void> processActionForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId) {
|
||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
||||
PageData<Edge> pageData;
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
do {
|
||||
pageData = edgeService.findEdgesByTenantId(tenantId, pageLink);
|
||||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
||||
for (Edge edge : pageData.getData()) {
|
||||
saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, null);
|
||||
futures.add(saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, null));
|
||||
}
|
||||
if (pageData.hasNext()) {
|
||||
pageLink = pageLink.nextPageLink();
|
||||
}
|
||||
}
|
||||
} while (pageData != null && pageData.hasNext());
|
||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||
}
|
||||
}
|
||||
|
||||
@ -15,6 +15,8 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.Customer;
|
||||
@ -35,6 +37,8 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@Component
|
||||
@ -70,7 +74,7 @@ public class CustomerEdgeProcessor extends BaseEdgeProcessor {
|
||||
return downlinkMsg;
|
||||
}
|
||||
|
||||
public void processCustomerNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
||||
public ListenableFuture<Void> processCustomerNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
||||
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
|
||||
EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType());
|
||||
UUID uuid = new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB());
|
||||
@ -79,22 +83,24 @@ public class CustomerEdgeProcessor extends BaseEdgeProcessor {
|
||||
case UPDATED:
|
||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
||||
PageData<Edge> pageData;
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
do {
|
||||
pageData = edgeService.findEdgesByTenantIdAndCustomerId(tenantId, customerId, pageLink);
|
||||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
||||
for (Edge edge : pageData.getData()) {
|
||||
saveEdgeEvent(tenantId, edge.getId(), type, actionType, customerId, null);
|
||||
futures.add(saveEdgeEvent(tenantId, edge.getId(), type, actionType, customerId, null));
|
||||
}
|
||||
if (pageData.hasNext()) {
|
||||
pageLink = pageLink.nextPageLink();
|
||||
}
|
||||
}
|
||||
} while (pageData != null && pageData.hasNext());
|
||||
break;
|
||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||
case DELETED:
|
||||
EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB()));
|
||||
saveEdgeEvent(tenantId, edgeId, type, actionType, customerId, null);
|
||||
break;
|
||||
return saveEdgeEvent(tenantId, edgeId, type, actionType, customerId, null);
|
||||
default:
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -84,7 +84,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
||||
if (deviceAlreadyExistsForThisEdge) {
|
||||
log.info("[{}] Device with name '{}' already exists on the cloud, and related to this edge [{}]. " +
|
||||
"deviceUpdateMsg [{}], Updating device", tenantId, deviceName, edge.getId(), deviceUpdateMsg);
|
||||
updateDevice(tenantId, edge, deviceUpdateMsg);
|
||||
return updateDevice(tenantId, edge, deviceUpdateMsg);
|
||||
} else {
|
||||
log.info("[{}] Device with name '{}' already exists on the cloud, but not related to this edge [{}]. deviceUpdateMsg [{}]." +
|
||||
"Creating a new device with random prefix and relate to this edge", tenantId, deviceName, edge.getId(), deviceUpdateMsg);
|
||||
@ -99,8 +99,10 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
||||
}
|
||||
ObjectNode body = mapper.createObjectNode();
|
||||
body.put("conflictName", deviceName);
|
||||
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, newDevice.getId(), body);
|
||||
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, newDevice.getId(), null);
|
||||
ListenableFuture<Void> input = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, newDevice.getId(), body);
|
||||
return Futures.transformAsync(input, unused ->
|
||||
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, newDevice.getId(), null),
|
||||
dbCallbackExecutorService);
|
||||
}
|
||||
} else {
|
||||
log.info("[{}] Creating new device and replacing device entity on the edge [{}]", tenantId, deviceUpdateMsg);
|
||||
@ -111,24 +113,22 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
||||
log.error(errMsg, e);
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, device.getId(), null);
|
||||
return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, device.getId(), null);
|
||||
}
|
||||
break;
|
||||
case ENTITY_UPDATED_RPC_MESSAGE:
|
||||
updateDevice(tenantId, edge, deviceUpdateMsg);
|
||||
break;
|
||||
return updateDevice(tenantId, edge, deviceUpdateMsg);
|
||||
case ENTITY_DELETED_RPC_MESSAGE:
|
||||
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
|
||||
Device deviceToDelete = deviceService.findDeviceById(tenantId, deviceId);
|
||||
if (deviceToDelete != null) {
|
||||
deviceService.unassignDeviceFromEdge(tenantId, deviceId, edge.getId());
|
||||
}
|
||||
break;
|
||||
return Futures.immediateFuture(null);
|
||||
case UNRECOGNIZED:
|
||||
default:
|
||||
log.error("Unsupported msg type {}", deviceUpdateMsg.getMsgType());
|
||||
return Futures.immediateFailedFuture(new RuntimeException("Unsupported msg type " + deviceUpdateMsg.getMsgType()));
|
||||
}
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
|
||||
private boolean isDeviceAlreadyExistsOnCloudForThisEdge(TenantId tenantId, Edge edge, Device device) {
|
||||
@ -174,7 +174,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
||||
}
|
||||
|
||||
|
||||
private void updateDevice(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
|
||||
private ListenableFuture<Void> updateDevice(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
|
||||
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
|
||||
Device device = deviceService.findDeviceById(tenantId, deviceId);
|
||||
if (device != null) {
|
||||
@ -194,9 +194,11 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
||||
}
|
||||
Device savedDevice = deviceService.saveDevice(device);
|
||||
tbClusterService.onDeviceUpdated(savedDevice, device);
|
||||
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null);
|
||||
return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null);
|
||||
} else {
|
||||
log.warn("[{}] can't find device [{}], edge [{}]", tenantId, deviceUpdateMsg, edge.getId());
|
||||
String errMsg = String.format("[%s] can't find device [%s], edge [%s]", tenantId, deviceUpdateMsg, edge.getId());
|
||||
log.warn(errMsg);
|
||||
return Futures.immediateFailedFuture(new RuntimeException(errMsg));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -15,11 +15,9 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.User;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
@ -33,6 +31,8 @@ import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@Component
|
||||
@ -40,7 +40,7 @@ import java.util.UUID;
|
||||
@TbCoreComponent
|
||||
public class EdgeProcessor extends BaseEdgeProcessor {
|
||||
|
||||
public void processEdgeNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
||||
public ListenableFuture<Void> processEdgeNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
||||
try {
|
||||
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
|
||||
EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
|
||||
@ -49,11 +49,12 @@ public class EdgeProcessor extends BaseEdgeProcessor {
|
||||
case ASSIGNED_TO_CUSTOMER:
|
||||
CustomerId customerId = mapper.readValue(edgeNotificationMsg.getBody(), CustomerId.class);
|
||||
edgeFuture = edgeService.findEdgeByIdAsync(tenantId, edgeId);
|
||||
Futures.addCallback(edgeFuture, new FutureCallback<Edge>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Edge edge) {
|
||||
if (edge != null && !customerId.isNullUid()) {
|
||||
saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, customerId, null);
|
||||
return Futures.transformAsync(edgeFuture, edge -> {
|
||||
if (edge == null || customerId.isNullUid()) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, customerId, null));
|
||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
||||
PageData<User> pageData;
|
||||
do {
|
||||
@ -61,42 +62,30 @@ public class EdgeProcessor extends BaseEdgeProcessor {
|
||||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
||||
log.trace("[{}] [{}] user(s) are going to be added to edge.", edge.getId(), pageData.getData().size());
|
||||
for (User user : pageData.getData()) {
|
||||
saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, EdgeEventActionType.ADDED, user.getId(), null);
|
||||
futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, EdgeEventActionType.ADDED, user.getId(), null));
|
||||
}
|
||||
if (pageData.hasNext()) {
|
||||
pageLink = pageLink.nextPageLink();
|
||||
}
|
||||
}
|
||||
} while (pageData != null && pageData.hasNext());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
log.error("Can't find edge by id [{}]", edgeNotificationMsg, t);
|
||||
}
|
||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||
}, dbCallbackExecutorService);
|
||||
break;
|
||||
case UNASSIGNED_FROM_CUSTOMER:
|
||||
CustomerId customerIdToDelete = mapper.readValue(edgeNotificationMsg.getBody(), CustomerId.class);
|
||||
edgeFuture = edgeService.findEdgeByIdAsync(tenantId, edgeId);
|
||||
Futures.addCallback(edgeFuture, new FutureCallback<Edge>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Edge edge) {
|
||||
if (edge != null && !customerIdToDelete.isNullUid()) {
|
||||
saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.DELETED, customerIdToDelete, null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
log.error("Can't find edge by id [{}]", edgeNotificationMsg, t);
|
||||
return Futures.transformAsync(edgeFuture, edge -> {
|
||||
if (edge == null || customerIdToDelete.isNullUid()) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
return saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.DELETED, customerIdToDelete, null);
|
||||
}, dbCallbackExecutorService);
|
||||
break;
|
||||
default:
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Exception during processing edge event", e);
|
||||
return Futures.immediateFailedFuture(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -15,11 +15,9 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
@ -45,6 +43,7 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@ -89,25 +88,49 @@ public class EntityEdgeProcessor extends BaseEdgeProcessor {
|
||||
return downlinkMsg;
|
||||
}
|
||||
|
||||
public void processEntityNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
||||
public ListenableFuture<Void> processEntityNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
||||
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
|
||||
EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType());
|
||||
EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(type,
|
||||
new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
|
||||
EdgeId edgeId = null;
|
||||
if (edgeNotificationMsg.getEdgeIdMSB() != 0 && edgeNotificationMsg.getEdgeIdLSB() != 0) {
|
||||
edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB()));
|
||||
}
|
||||
EdgeId edgeId = safeGetEdgeId(edgeNotificationMsg);
|
||||
switch (actionType) {
|
||||
case ADDED: // used only for USER entity
|
||||
case UPDATED:
|
||||
case CREDENTIALS_UPDATED:
|
||||
pushNotificationToAllRelatedEdges(tenantId, entityId, type, actionType);
|
||||
break;
|
||||
return pushNotificationToAllRelatedEdges(tenantId, entityId, type, actionType);
|
||||
case ASSIGNED_TO_CUSTOMER:
|
||||
case UNASSIGNED_FROM_CUSTOMER:
|
||||
return pushNotificationToAllRelatedCustomerEdges(tenantId, edgeNotificationMsg, entityId, actionType, type);
|
||||
case DELETED:
|
||||
if (edgeId != null) {
|
||||
return saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, null);
|
||||
} else {
|
||||
return pushNotificationToAllRelatedEdges(tenantId, entityId, type, actionType);
|
||||
}
|
||||
case ASSIGNED_TO_EDGE:
|
||||
case UNASSIGNED_FROM_EDGE:
|
||||
ListenableFuture<Void> future = saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, null);
|
||||
return Futures.transformAsync(future, unused -> {
|
||||
if (type.equals(EdgeEventType.RULE_CHAIN)) {
|
||||
return updateDependentRuleChains(tenantId, new RuleChainId(entityId.getId()), edgeId);
|
||||
} else {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
}, dbCallbackExecutorService);
|
||||
default:
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
}
|
||||
|
||||
private ListenableFuture<Void> pushNotificationToAllRelatedCustomerEdges(TenantId tenantId,
|
||||
TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg,
|
||||
EntityId entityId,
|
||||
EdgeEventActionType actionType,
|
||||
EdgeEventType type) {
|
||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
||||
PageData<EdgeId> pageData;
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
do {
|
||||
pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink);
|
||||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
||||
@ -115,22 +138,17 @@ public class EntityEdgeProcessor extends BaseEdgeProcessor {
|
||||
try {
|
||||
CustomerId customerId = mapper.readValue(edgeNotificationMsg.getBody(), CustomerId.class);
|
||||
ListenableFuture<Edge> future = edgeService.findEdgeByIdAsync(tenantId, relatedEdgeId);
|
||||
Futures.addCallback(future, new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Edge edge) {
|
||||
futures.add(Futures.transformAsync(future, edge -> {
|
||||
if (edge != null && edge.getCustomerId() != null &&
|
||||
!edge.getCustomerId().isNullUid() && edge.getCustomerId().equals(customerId)) {
|
||||
saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null);
|
||||
return saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null);
|
||||
} else {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
log.error("Failed to find edge by id [{}] {}", edgeNotificationMsg, t);
|
||||
}
|
||||
}, dbCallbackExecutorService);
|
||||
}, dbCallbackExecutorService));
|
||||
} catch (Exception e) {
|
||||
log.error("Can't parse customer id from entity body [{}]", edgeNotificationMsg, e);
|
||||
return Futures.immediateFailedFuture(e);
|
||||
}
|
||||
}
|
||||
if (pageData.hasNext()) {
|
||||
@ -138,43 +156,39 @@ public class EntityEdgeProcessor extends BaseEdgeProcessor {
|
||||
}
|
||||
}
|
||||
} while (pageData != null && pageData.hasNext());
|
||||
break;
|
||||
case DELETED:
|
||||
if (edgeId != null) {
|
||||
saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, null);
|
||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||
}
|
||||
|
||||
private EdgeId safeGetEdgeId(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
||||
if (edgeNotificationMsg.getEdgeIdMSB() != 0 && edgeNotificationMsg.getEdgeIdLSB() != 0) {
|
||||
return new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB()));
|
||||
} else {
|
||||
pushNotificationToAllRelatedEdges(tenantId, entityId, type, actionType);
|
||||
}
|
||||
break;
|
||||
case ASSIGNED_TO_EDGE:
|
||||
case UNASSIGNED_FROM_EDGE:
|
||||
saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, null);
|
||||
if (type.equals(EdgeEventType.RULE_CHAIN)) {
|
||||
updateDependentRuleChains(tenantId, new RuleChainId(entityId.getId()), edgeId);
|
||||
}
|
||||
break;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private void pushNotificationToAllRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type, EdgeEventActionType actionType) {
|
||||
private ListenableFuture<Void> pushNotificationToAllRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type, EdgeEventActionType actionType) {
|
||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
||||
PageData<EdgeId> pageData;
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
do {
|
||||
pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink);
|
||||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
||||
for (EdgeId relatedEdgeId : pageData.getData()) {
|
||||
saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null);
|
||||
futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null));
|
||||
}
|
||||
if (pageData.hasNext()) {
|
||||
pageLink = pageLink.nextPageLink();
|
||||
}
|
||||
}
|
||||
} while (pageData != null && pageData.hasNext());
|
||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||
}
|
||||
|
||||
private void updateDependentRuleChains(TenantId tenantId, RuleChainId processingRuleChainId, EdgeId edgeId) {
|
||||
private ListenableFuture<Void> updateDependentRuleChains(TenantId tenantId, RuleChainId processingRuleChainId, EdgeId edgeId) {
|
||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
||||
PageData<RuleChain> pageData;
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
do {
|
||||
pageData = ruleChainService.findRuleChainsByTenantIdAndEdgeId(tenantId, edgeId, pageLink);
|
||||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
||||
@ -185,12 +199,12 @@ public class EntityEdgeProcessor extends BaseEdgeProcessor {
|
||||
if (connectionInfos != null && !connectionInfos.isEmpty()) {
|
||||
for (RuleChainConnectionInfo connectionInfo : connectionInfos) {
|
||||
if (connectionInfo.getTargetRuleChainId().equals(processingRuleChainId)) {
|
||||
saveEdgeEvent(tenantId,
|
||||
futures.add(saveEdgeEvent(tenantId,
|
||||
edgeId,
|
||||
EdgeEventType.RULE_CHAIN_METADATA,
|
||||
EdgeEventActionType.UPDATED,
|
||||
ruleChain.getId(),
|
||||
null);
|
||||
null));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -201,9 +215,10 @@ public class EntityEdgeProcessor extends BaseEdgeProcessor {
|
||||
}
|
||||
}
|
||||
} while (pageData != null && pageData.hasNext());
|
||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||
}
|
||||
|
||||
public void processEntityNotificationForAllEdges(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
||||
public ListenableFuture<Void> processEntityNotificationForAllEdges(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
||||
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
|
||||
EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType());
|
||||
EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(type, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
|
||||
@ -211,8 +226,9 @@ public class EntityEdgeProcessor extends BaseEdgeProcessor {
|
||||
case ADDED:
|
||||
case UPDATED:
|
||||
case DELETED:
|
||||
processActionForAllEdges(tenantId, type, actionType, entityId);
|
||||
break;
|
||||
return processActionForAllEdges(tenantId, type, actionType, entityId);
|
||||
default:
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -126,24 +126,29 @@ public class RelationEdgeProcessor extends BaseEdgeProcessor {
|
||||
.build();
|
||||
}
|
||||
|
||||
public void processRelationNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException {
|
||||
public ListenableFuture<Void> processRelationNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException {
|
||||
EntityRelation relation = mapper.readValue(edgeNotificationMsg.getBody(), EntityRelation.class);
|
||||
if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) &&
|
||||
!relation.getTo().getEntityType().equals(EntityType.EDGE)) {
|
||||
if (relation.getFrom().getEntityType().equals(EntityType.EDGE) ||
|
||||
relation.getTo().getEntityType().equals(EntityType.EDGE)) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
|
||||
Set<EdgeId> uniqueEdgeIds = new HashSet<>();
|
||||
uniqueEdgeIds.addAll(findRelatedEdgeIds(tenantId, relation.getTo()));
|
||||
uniqueEdgeIds.addAll(findRelatedEdgeIds(tenantId, relation.getFrom()));
|
||||
if (!uniqueEdgeIds.isEmpty()) {
|
||||
if (uniqueEdgeIds.isEmpty()) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
for (EdgeId edgeId : uniqueEdgeIds) {
|
||||
saveEdgeEvent(tenantId,
|
||||
futures.add(saveEdgeEvent(tenantId,
|
||||
edgeId,
|
||||
EdgeEventType.RELATION,
|
||||
EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()),
|
||||
null,
|
||||
mapper.valueToTree(relation));
|
||||
}
|
||||
}
|
||||
mapper.valueToTree(relation)));
|
||||
}
|
||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||
}
|
||||
|
||||
private List<EdgeId> findRelatedEdgeIds(TenantId tenantId, EntityId entityId) {
|
||||
|
||||
@ -26,6 +26,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.cluster.TbClusterService;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
@ -72,7 +73,6 @@ import org.thingsboard.server.gen.edge.v1.RuleChainMetadataRequestMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UserCredentialsRequestMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.WidgetBundleTypesRequestMsg;
|
||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
||||
import org.thingsboard.server.cluster.TbClusterService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
@ -121,14 +121,14 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
|
||||
@Override
|
||||
public ListenableFuture<Void> processRuleChainMetadataRequestMsg(TenantId tenantId, Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg) {
|
||||
log.trace("[{}] processRuleChainMetadataRequestMsg [{}][{}]", tenantId, edge.getName(), ruleChainMetadataRequestMsg);
|
||||
if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) {
|
||||
if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() == 0 || ruleChainMetadataRequestMsg.getRuleChainIdLSB() == 0) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
RuleChainId ruleChainId =
|
||||
new RuleChainId(new UUID(ruleChainMetadataRequestMsg.getRuleChainIdMSB(), ruleChainMetadataRequestMsg.getRuleChainIdLSB()));
|
||||
saveEdgeEvent(tenantId, edge.getId(),
|
||||
return saveEdgeEvent(tenantId, edge.getId(),
|
||||
EdgeEventType.RULE_CHAIN_METADATA, EdgeEventActionType.ADDED, ruleChainId, null);
|
||||
}
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> processAttributesRequestMsg(TenantId tenantId, Edge edge, AttributesRequestMsg attributesRequestMsg) {
|
||||
@ -137,14 +137,25 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
|
||||
EntityType.valueOf(attributesRequestMsg.getEntityType()),
|
||||
new UUID(attributesRequestMsg.getEntityIdMSB(), attributesRequestMsg.getEntityIdLSB()));
|
||||
final EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(entityId.getEntityType());
|
||||
if (type != null) {
|
||||
if (type == null) {
|
||||
log.warn("[{}] Type doesn't supported {}", tenantId, entityId.getEntityType());
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
SettableFuture<Void> futureToSet = SettableFuture.create();
|
||||
String scope = attributesRequestMsg.getScope();
|
||||
ListenableFuture<List<AttributeKvEntry>> findAttrFuture = attributesService.findAll(tenantId, entityId, scope);
|
||||
Futures.addCallback(findAttrFuture, new FutureCallback<List<AttributeKvEntry>>() {
|
||||
Futures.addCallback(findAttrFuture, new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable List<AttributeKvEntry> ssAttributes) {
|
||||
if (ssAttributes != null && !ssAttributes.isEmpty()) {
|
||||
if (ssAttributes == null || ssAttributes.isEmpty()) {
|
||||
log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId,
|
||||
edge.getName(),
|
||||
entityId.getEntityType(),
|
||||
entityId.getId());
|
||||
futureToSet.set(null);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
Map<String, Object> entityData = new HashMap<>();
|
||||
ObjectNode attributes = mapper.createObjectNode();
|
||||
@ -163,37 +174,35 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
|
||||
entityData.put("scope", scope);
|
||||
JsonNode body = mapper.valueToTree(entityData);
|
||||
log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, body);
|
||||
saveEdgeEvent(tenantId,
|
||||
edge.getId(),
|
||||
type,
|
||||
EdgeEventActionType.ATTRIBUTES_UPDATED,
|
||||
entityId,
|
||||
body);
|
||||
} catch (Exception e) {
|
||||
log.error("[{}] Failed to save attribute updates to the edge", edge.getName(), e);
|
||||
futureToSet.setException(new RuntimeException("[" + edge.getName() + "] Failed to send attribute updates to the edge", e));
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId,
|
||||
edge.getName(),
|
||||
entityId.getEntityType(),
|
||||
entityId.getId());
|
||||
}
|
||||
ListenableFuture<Void> future = saveEdgeEvent(tenantId, edge.getId(), type, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body);
|
||||
Futures.addCallback(future, new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Void unused) {
|
||||
futureToSet.set(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable throwable) {
|
||||
String errMsg = String.format("[%s] Failed to save edge event [%s]", edge.getId(), attributesRequestMsg);
|
||||
log.error(errMsg, throwable);
|
||||
futureToSet.setException(new RuntimeException(errMsg, throwable));
|
||||
}
|
||||
}, dbCallbackExecutorService);
|
||||
} catch (Exception e) {
|
||||
String errMsg = String.format("[%s] Failed to save attribute updates to the edge [%s]", edge.getId(), attributesRequestMsg);
|
||||
log.error(errMsg, e);
|
||||
futureToSet.setException(new RuntimeException(errMsg, e));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
log.error("Can't find attributes [{}]", attributesRequestMsg, t);
|
||||
futureToSet.setException(t);
|
||||
String errMsg = String.format("[%s] Can't find attributes [%s]", edge.getId(), attributesRequestMsg);
|
||||
log.error(errMsg, t);
|
||||
futureToSet.setException(new RuntimeException(errMsg, t));
|
||||
}
|
||||
}, dbCallbackExecutorService);
|
||||
return futureToSet;
|
||||
} else {
|
||||
log.warn("[{}] Type doesn't supported {}", tenantId, entityId.getEntityType());
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -208,33 +217,49 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
|
||||
futures.add(findRelationByQuery(tenantId, edge, entityId, EntitySearchDirection.TO));
|
||||
ListenableFuture<List<List<EntityRelation>>> relationsListFuture = Futures.allAsList(futures);
|
||||
SettableFuture<Void> futureToSet = SettableFuture.create();
|
||||
Futures.addCallback(relationsListFuture, new FutureCallback<List<List<EntityRelation>>>() {
|
||||
Futures.addCallback(relationsListFuture, new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable List<List<EntityRelation>> relationsList) {
|
||||
try {
|
||||
if (relationsList != null && !relationsList.isEmpty()) {
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
for (List<EntityRelation> entityRelations : relationsList) {
|
||||
log.trace("[{}] [{}] [{}] relation(s) are going to be pushed to edge.", edge.getId(), entityId, entityRelations.size());
|
||||
for (EntityRelation relation : entityRelations) {
|
||||
try {
|
||||
if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) &&
|
||||
!relation.getTo().getEntityType().equals(EntityType.EDGE)) {
|
||||
saveEdgeEvent(tenantId,
|
||||
futures.add(saveEdgeEvent(tenantId,
|
||||
edge.getId(),
|
||||
EdgeEventType.RELATION,
|
||||
EdgeEventActionType.ADDED,
|
||||
null,
|
||||
mapper.valueToTree(relation));
|
||||
mapper.valueToTree(relation)));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Exception during loading relation [{}] to edge on sync!", relation, e);
|
||||
futureToSet.setException(e);
|
||||
String errMsg = String.format("[%s] Exception during loading relation [%s] to edge on sync!", edge.getId(), relation);
|
||||
log.error(errMsg, e);
|
||||
futureToSet.setException(new RuntimeException(errMsg, e));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable List<Void> voids) {
|
||||
futureToSet.set(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable throwable) {
|
||||
String errMsg = String.format("[%s] Exception during saving edge events [%s]!", edge.getId(), relationRequestMsg);
|
||||
log.error(errMsg, throwable);
|
||||
futureToSet.setException(new RuntimeException(errMsg, throwable));
|
||||
}
|
||||
}, dbCallbackExecutorService);
|
||||
} else {
|
||||
futureToSet.set(null);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Exception during loading relation(s) to edge on sync!", e);
|
||||
futureToSet.setException(e);
|
||||
@ -243,8 +268,9 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
log.error("[{}] Can't find relation by query. Entity id [{}]", tenantId, entityId, t);
|
||||
futureToSet.setException(t);
|
||||
String errMsg = String.format("[%s] Can't find relation by query. Entity id [%s]!", tenantId, entityId);
|
||||
log.error(errMsg, t);
|
||||
futureToSet.setException(new RuntimeException(errMsg, t));
|
||||
}
|
||||
}, dbCallbackExecutorService);
|
||||
return futureToSet;
|
||||
@ -260,40 +286,42 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
|
||||
@Override
|
||||
public ListenableFuture<Void> processDeviceCredentialsRequestMsg(TenantId tenantId, Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg) {
|
||||
log.trace("[{}] processDeviceCredentialsRequestMsg [{}][{}]", tenantId, edge.getName(), deviceCredentialsRequestMsg);
|
||||
if (deviceCredentialsRequestMsg.getDeviceIdMSB() != 0 && deviceCredentialsRequestMsg.getDeviceIdLSB() != 0) {
|
||||
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsRequestMsg.getDeviceIdMSB(), deviceCredentialsRequestMsg.getDeviceIdLSB()));
|
||||
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE,
|
||||
EdgeEventActionType.CREDENTIALS_UPDATED, deviceId, null);
|
||||
}
|
||||
if (deviceCredentialsRequestMsg.getDeviceIdMSB() == 0 || deviceCredentialsRequestMsg.getDeviceIdLSB() == 0) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsRequestMsg.getDeviceIdMSB(), deviceCredentialsRequestMsg.getDeviceIdLSB()));
|
||||
return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE,
|
||||
EdgeEventActionType.CREDENTIALS_UPDATED, deviceId, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> processUserCredentialsRequestMsg(TenantId tenantId, Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg) {
|
||||
log.trace("[{}] processUserCredentialsRequestMsg [{}][{}]", tenantId, edge.getName(), userCredentialsRequestMsg);
|
||||
if (userCredentialsRequestMsg.getUserIdMSB() != 0 && userCredentialsRequestMsg.getUserIdLSB() != 0) {
|
||||
UserId userId = new UserId(new UUID(userCredentialsRequestMsg.getUserIdMSB(), userCredentialsRequestMsg.getUserIdLSB()));
|
||||
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER,
|
||||
EdgeEventActionType.CREDENTIALS_UPDATED, userId, null);
|
||||
}
|
||||
if (userCredentialsRequestMsg.getUserIdMSB() == 0 || userCredentialsRequestMsg.getUserIdLSB() == 0) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
UserId userId = new UserId(new UUID(userCredentialsRequestMsg.getUserIdMSB(), userCredentialsRequestMsg.getUserIdLSB()));
|
||||
return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER,
|
||||
EdgeEventActionType.CREDENTIALS_UPDATED, userId, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> processDeviceProfileDevicesRequestMsg(TenantId tenantId, Edge edge, DeviceProfileDevicesRequestMsg deviceProfileDevicesRequestMsg) {
|
||||
log.trace("[{}] processDeviceProfileDevicesRequestMsg [{}][{}]", tenantId, edge.getName(), deviceProfileDevicesRequestMsg);
|
||||
if (deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB() != 0 && deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB() != 0) {
|
||||
DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB(), deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB()));
|
||||
DeviceProfile deviceProfileById = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId);
|
||||
if (deviceProfileById != null) {
|
||||
syncDevices(tenantId, edge, deviceProfileById.getName());
|
||||
}
|
||||
}
|
||||
if (deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB() == 0 || deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB() == 0) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB(), deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB()));
|
||||
DeviceProfile deviceProfileById = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId);
|
||||
if (deviceProfileById == null) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
return syncDevices(tenantId, edge, deviceProfileById.getName());
|
||||
}
|
||||
|
||||
private void syncDevices(TenantId tenantId, Edge edge, String deviceType) {
|
||||
private ListenableFuture<Void> syncDevices(TenantId tenantId, Edge edge, String deviceType) {
|
||||
log.trace("[{}] syncDevices [{}][{}]", tenantId, edge.getName(), deviceType);
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
try {
|
||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
||||
PageData<Device> pageData;
|
||||
@ -302,7 +330,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
|
||||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
||||
log.trace("[{}] [{}] device(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
|
||||
for (Device device : pageData.getData()) {
|
||||
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ADDED, device.getId(), null);
|
||||
futures.add(saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ADDED, device.getId(), null));
|
||||
}
|
||||
if (pageData.hasNext()) {
|
||||
pageLink = pageLink.nextPageLink();
|
||||
@ -312,25 +340,26 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
|
||||
} catch (Exception e) {
|
||||
log.error("Exception during loading edge device(s) on sync!", e);
|
||||
}
|
||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> processWidgetBundleTypesRequestMsg(TenantId tenantId, Edge edge,
|
||||
WidgetBundleTypesRequestMsg widgetBundleTypesRequestMsg) {
|
||||
log.trace("[{}] processWidgetBundleTypesRequestMsg [{}][{}]", tenantId, edge.getName(), widgetBundleTypesRequestMsg);
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
if (widgetBundleTypesRequestMsg.getWidgetBundleIdMSB() != 0 && widgetBundleTypesRequestMsg.getWidgetBundleIdLSB() != 0) {
|
||||
WidgetsBundleId widgetsBundleId = new WidgetsBundleId(new UUID(widgetBundleTypesRequestMsg.getWidgetBundleIdMSB(), widgetBundleTypesRequestMsg.getWidgetBundleIdLSB()));
|
||||
WidgetsBundle widgetsBundleById = widgetsBundleService.findWidgetsBundleById(tenantId, widgetsBundleId);
|
||||
if (widgetsBundleById != null) {
|
||||
List<WidgetType> widgetTypesToPush =
|
||||
widgetTypeService.findWidgetTypesByTenantIdAndBundleAlias(widgetsBundleById.getTenantId(), widgetsBundleById.getAlias());
|
||||
|
||||
for (WidgetType widgetType : widgetTypesToPush) {
|
||||
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGET_TYPE, EdgeEventActionType.ADDED, widgetType.getId(), null);
|
||||
futures.add(saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGET_TYPE, EdgeEventActionType.ADDED, widgetType.getId(), null));
|
||||
}
|
||||
}
|
||||
}
|
||||
return Futures.immediateFuture(null);
|
||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -343,46 +372,35 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
|
||||
Futures.addCallback(entityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId), new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable List<EntityView> entityViews) {
|
||||
try {
|
||||
if (entityViews != null && !entityViews.isEmpty()) {
|
||||
List<ListenableFuture<Boolean>> futures = new ArrayList<>();
|
||||
if (entityViews == null || entityViews.isEmpty()) {
|
||||
futureToSet.set(null);
|
||||
return;
|
||||
}
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
for (EntityView entityView : entityViews) {
|
||||
ListenableFuture<Boolean> future = relationService.checkRelation(tenantId, edge.getId(), entityView.getId(),
|
||||
EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);
|
||||
futures.add(future);
|
||||
Futures.addCallback(future, new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Boolean result) {
|
||||
futures.add(Futures.transformAsync(future, result -> {
|
||||
if (Boolean.TRUE.equals(result)) {
|
||||
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ENTITY_VIEW,
|
||||
return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ENTITY_VIEW,
|
||||
EdgeEventActionType.ADDED, entityView.getId(), null);
|
||||
} else {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
// Do nothing - error handles in allAsList
|
||||
}
|
||||
}, dbCallbackExecutorService);
|
||||
}, dbCallbackExecutorService));
|
||||
}
|
||||
Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable List<Boolean> result) {
|
||||
public void onSuccess(@Nullable List<Void> result) {
|
||||
futureToSet.set(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
log.error("Exception during loading relation [{}] to edge on sync!", t, t);
|
||||
log.error("Exception during loading relation to edge on sync!", t);
|
||||
futureToSet.setException(t);
|
||||
}
|
||||
}, dbCallbackExecutorService);
|
||||
} else {
|
||||
futureToSet.set(null);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Exception during loading relation(s) to edge on sync!", e);
|
||||
futureToSet.setException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -394,7 +412,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
|
||||
return futureToSet;
|
||||
}
|
||||
|
||||
private void saveEdgeEvent(TenantId tenantId,
|
||||
private ListenableFuture<Void> saveEdgeEvent(TenantId tenantId,
|
||||
EdgeId edgeId,
|
||||
EdgeEventType type,
|
||||
EdgeEventActionType action,
|
||||
@ -405,17 +423,9 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
|
||||
|
||||
EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, type, action, entityId, body);
|
||||
|
||||
Futures.addCallback(edgeEventService.saveAsync(edgeEvent), new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Void unused) {
|
||||
return Futures.transform(edgeEventService.saveAsync(edgeEvent), unused -> {
|
||||
tbClusterService.onEdgeEventUpdate(tenantId, edgeId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
String errMsg = String.format("Failed to save edge event. edge event [%s]", edgeEvent);
|
||||
log.warn(errMsg, t);
|
||||
}
|
||||
return null;
|
||||
}, dbCallbackExecutorService);
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user