Added correct future callbacks

This commit is contained in:
Volodymyr Babak 2020-09-17 18:42:42 +03:00
parent 4bb56df51a
commit fc8b138cfd
6 changed files with 110 additions and 43 deletions

View File

@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
@ -385,12 +386,25 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
@Override
public ListenableFuture<Void> processRuleChainMetadataRequestMsg(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg) {
SettableFuture<Void> futureToSet = SettableFuture.create();
if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) {
RuleChainId ruleChainId = new RuleChainId(new UUID(ruleChainMetadataRequestMsg.getRuleChainIdMSB(), ruleChainMetadataRequestMsg.getRuleChainIdLSB()));
RuleChainId ruleChainId =
new RuleChainId(new UUID(ruleChainMetadataRequestMsg.getRuleChainIdMSB(), ruleChainMetadataRequestMsg.getRuleChainIdLSB()));
ListenableFuture<EdgeEvent> future = saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.RULE_CHAIN_METADATA, ActionType.ADDED, ruleChainId, null);
return Futures.transform(future, edgeEvent -> null, dbCallbackExecutorService);
Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
@Override
public void onSuccess(@Nullable EdgeEvent result) {
futureToSet.set(null);
}
@Override
public void onFailure(Throwable t) {
log.error("Can't save edge event [{}]", ruleChainMetadataRequestMsg, t);
futureToSet.setException(t);
}
}, dbCallbackExecutorService);
}
return Futures.immediateFuture(null);
return futureToSet;
}
@Override
@ -400,41 +414,51 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
new UUID(attributesRequestMsg.getEntityIdMSB(), attributesRequestMsg.getEntityIdLSB()));
final EdgeEventType edgeEventType = getEdgeQueueTypeByEntityType(entityId.getEntityType());
if (edgeEventType != null) {
SettableFuture<Void> futureToSet = SettableFuture.create();
ListenableFuture<List<AttributeKvEntry>> ssAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, DataConstants.SERVER_SCOPE);
return Futures.transform(ssAttrFuture, ssAttributes -> {
if (ssAttributes != null && !ssAttributes.isEmpty()) {
try {
Map<String, Object> entityData = new HashMap<>();
ObjectNode attributes = mapper.createObjectNode();
for (AttributeKvEntry attr : ssAttributes) {
if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) {
attributes.put(attr.getKey(), attr.getBooleanValue().get());
} else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) {
attributes.put(attr.getKey(), attr.getDoubleValue().get());
} else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) {
attributes.put(attr.getKey(), attr.getLongValue().get());
} else {
attributes.put(attr.getKey(), attr.getValueAsString());
Futures.addCallback(ssAttrFuture, new FutureCallback<List<AttributeKvEntry>>() {
@Override
public void onSuccess(@Nullable List<AttributeKvEntry> ssAttributes) {
if (ssAttributes != null && !ssAttributes.isEmpty()) {
try {
Map<String, Object> entityData = new HashMap<>();
ObjectNode attributes = mapper.createObjectNode();
for (AttributeKvEntry attr : ssAttributes) {
if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) {
attributes.put(attr.getKey(), attr.getBooleanValue().get());
} else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) {
attributes.put(attr.getKey(), attr.getDoubleValue().get());
} else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) {
attributes.put(attr.getKey(), attr.getLongValue().get());
} else {
attributes.put(attr.getKey(), attr.getValueAsString());
}
}
entityData.put("kv", attributes);
entityData.put("scope", DataConstants.SERVER_SCOPE);
JsonNode entityBody = mapper.valueToTree(entityData);
log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, entityBody);
saveEdgeEvent(edge.getTenantId(),
edge.getId(),
edgeEventType,
ActionType.ATTRIBUTES_UPDATED,
entityId,
entityBody);
} catch (Exception e) {
log.error("[{}] Failed to send attribute updates to the edge", edge.getName(), e);
throw new RuntimeException("[" + edge.getName() + "] Failed to send attribute updates to the edge", e);
}
}
futureToSet.set(null);
}
entityData.put("kv", attributes);
entityData.put("scope", DataConstants.SERVER_SCOPE);
JsonNode entityBody = mapper.valueToTree(entityData);
log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, entityBody);
saveEdgeEvent(edge.getTenantId(),
edge.getId(),
edgeEventType,
ActionType.ATTRIBUTES_UPDATED,
entityId,
entityBody);
} catch (Exception e) {
log.error("[{}] Failed to send attribute updates to the edge", edge.getName(), e);
throw new RuntimeException("[" + edge.getName() + "] Failed to send attribute updates to the edge", e);
}
}
return null;
}, dbCallbackExecutorService);
@Override
public void onFailure(Throwable t) {
log.error("Can't save attributes [{}]", attributesRequestMsg, t);
futureToSet.setException(t);
}
}, dbCallbackExecutorService);
return futureToSet;
// TODO: voba - push shared attributes to edge?
// ListenableFuture<List<AttributeKvEntry>> shAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, DataConstants.SHARED_SCOPE);
// ListenableFuture<List<AttributeKvEntry>> clAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, DataConstants.CLIENT_SCOPE);
@ -504,22 +528,46 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
@Override
public ListenableFuture<Void> processDeviceCredentialsRequestMsg(Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg) {
SettableFuture<Void> futureToSet = SettableFuture.create();
if (deviceCredentialsRequestMsg.getDeviceIdMSB() != 0 && deviceCredentialsRequestMsg.getDeviceIdLSB() != 0) {
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsRequestMsg.getDeviceIdMSB(), deviceCredentialsRequestMsg.getDeviceIdLSB()));
ListenableFuture<EdgeEvent> future = saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.DEVICE, ActionType.CREDENTIALS_UPDATED, deviceId, null);
return Futures.transform(future, edgeEvent -> null, dbCallbackExecutorService);
Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
@Override
public void onSuccess(@Nullable EdgeEvent result) {
futureToSet.set(null);
}
@Override
public void onFailure(Throwable t) {
log.error("Can't save edge event [{}]", deviceCredentialsRequestMsg, t);
futureToSet.setException(t);
}
}, dbCallbackExecutorService);
}
return Futures.immediateFuture(null);
return futureToSet;
}
@Override
public ListenableFuture<Void> processUserCredentialsRequestMsg(Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg) {
SettableFuture<Void> futureToSet = SettableFuture.create();
if (userCredentialsRequestMsg.getUserIdMSB() != 0 && userCredentialsRequestMsg.getUserIdLSB() != 0) {
UserId userId = new UserId(new UUID(userCredentialsRequestMsg.getUserIdMSB(), userCredentialsRequestMsg.getUserIdLSB()));
ListenableFuture<EdgeEvent> future = saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, ActionType.CREDENTIALS_UPDATED, userId, null);
return Futures.transform(future, edgeEvent -> null, dbCallbackExecutorService);
Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
@Override
public void onSuccess(@Nullable EdgeEvent result) {
futureToSet.set(null);
}
@Override
public void onFailure(Throwable t) {
log.error("Can't save edge event [{}]", userCredentialsRequestMsg, t);
futureToSet.setException(t);
}
}, dbCallbackExecutorService);
}
return Futures.immediateFuture(null);
return futureToSet;
}
private ListenableFuture<EdgeEvent> saveEdgeEvent(TenantId tenantId,

View File

@ -26,9 +26,11 @@ import org.thingsboard.server.common.data.alarm.AlarmStatus;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.gen.edge.AlarmUpdateMsg;
import org.thingsboard.server.queue.util.TbCoreComponent;
@Component
@Slf4j
@TbCoreComponent
public class AlarmProcessor extends BaseProcessor {
public ListenableFuture<Void> onAlarmUpdate(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) {

View File

@ -47,9 +47,6 @@ public abstract class BaseProcessor {
protected static final ObjectMapper mapper = new ObjectMapper();
@Autowired
protected TbRuleEngineDeviceRpcService tbDeviceRpcService;
@Autowired
protected AlarmService alarmService;

View File

@ -19,6 +19,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
@ -46,6 +47,7 @@ import org.thingsboard.server.gen.edge.DeviceRpcCallMsg;
import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
import java.util.UUID;
@ -53,6 +55,7 @@ import java.util.concurrent.locks.ReentrantLock;
@Component
@Slf4j
@TbCoreComponent
public class DeviceProcessor extends BaseProcessor {
private static final ReentrantLock deviceCreationLock = new ReentrantLock();
@ -219,6 +222,7 @@ public class DeviceProcessor extends BaseProcessor {
}
public ListenableFuture<Void> processDeviceRpcCallResponseMsg(TenantId tenantId, DeviceRpcCallMsg deviceRpcCallMsg) {
SettableFuture<Void> futureToSet = SettableFuture.create();
UUID uuid = new UUID(deviceRpcCallMsg.getRequestIdMSB(), deviceRpcCallMsg.getRequestIdLSB());
FromDeviceRpcResponse response;
if (!StringUtils.isEmpty(deviceRpcCallMsg.getResponseMsg().getError())) {
@ -226,8 +230,20 @@ public class DeviceProcessor extends BaseProcessor {
} else {
response = new FromDeviceRpcResponse(uuid, deviceRpcCallMsg.getResponseMsg().getResponse(), null);
}
tbDeviceRpcService.sendRpcResponseToTbCore(deviceRpcCallMsg.getOriginServiceId(), response);
return Futures.immediateFuture(null);
TbQueueCallback callback = new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
futureToSet.set(null);
}
@Override
public void onFailure(Throwable t) {
log.error("Can't process push notification to core [{}]", deviceRpcCallMsg, t);
futureToSet.setException(t);
}
};
tbClusterService.pushNotificationToCore(deviceRpcCallMsg.getOriginServiceId(), response, callback);
return futureToSet;
}
}

View File

@ -34,11 +34,13 @@ import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.gen.edge.RelationUpdateMsg;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.UUID;
@Component
@Slf4j
@TbCoreComponent
public class RelationProcessor extends BaseProcessor {
public ListenableFuture<Void> onRelationUpdate(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) {

View File

@ -45,6 +45,7 @@ import org.thingsboard.server.gen.edge.EntityDataProto;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.ArrayList;
import java.util.HashSet;
@ -54,6 +55,7 @@ import java.util.UUID;
@Component
@Slf4j
@TbCoreComponent
public class TelemetryProcessor extends BaseProcessor {
private final Gson gson = new Gson();