Ack cloud messages

This commit is contained in:
Volodymyr Babak 2020-08-18 17:52:11 +03:00
parent ab14bcb963
commit 3e42f00cf7
6 changed files with 474 additions and 441 deletions

View File

@ -316,64 +316,62 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
}
@Override
public void processRuleChainMetadataRequestMsg(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg) {
public ListenableFuture<Void> processRuleChainMetadataRequestMsg(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg) {
if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) {
RuleChainId ruleChainId = new RuleChainId(new UUID(ruleChainMetadataRequestMsg.getRuleChainIdMSB(), ruleChainMetadataRequestMsg.getRuleChainIdLSB()));
saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.RULE_CHAIN_METADATA, ActionType.ADDED, ruleChainId, null);
ListenableFuture<EdgeEvent> future = saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.RULE_CHAIN_METADATA, ActionType.ADDED, ruleChainId, null);
return Futures.transform(future, edgeEvent -> null, dbCallbackExecutorService);
}
return Futures.immediateFuture(null);
}
@Override
public void processAttributesRequestMsg(Edge edge, AttributesRequestMsg attributesRequestMsg) {
public ListenableFuture<Void> processAttributesRequestMsg(Edge edge, AttributesRequestMsg attributesRequestMsg) {
EntityId entityId = EntityIdFactory.getByTypeAndUuid(
EntityType.valueOf(attributesRequestMsg.getEntityType()),
new UUID(attributesRequestMsg.getEntityIdMSB(), attributesRequestMsg.getEntityIdLSB()));
final EdgeEventType edgeEventType = getEdgeQueueTypeByEntityType(entityId.getEntityType());
if (edgeEventType != null) {
ListenableFuture<List<AttributeKvEntry>> ssAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, DataConstants.SERVER_SCOPE);
Futures.addCallback(ssAttrFuture, new FutureCallback<List<AttributeKvEntry>>() {
@Override
public void onSuccess(@Nullable List<AttributeKvEntry> ssAttributes) {
if (ssAttributes != null && !ssAttributes.isEmpty()) {
try {
Map<String, Object> entityData = new HashMap<>();
ObjectNode attributes = mapper.createObjectNode();
for (AttributeKvEntry attr : ssAttributes) {
if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) {
attributes.put(attr.getKey(), attr.getBooleanValue().get());
} else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) {
attributes.put(attr.getKey(), attr.getDoubleValue().get());
} else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) {
attributes.put(attr.getKey(), attr.getLongValue().get());
} else {
attributes.put(attr.getKey(), attr.getValueAsString());
}
return Futures.transform(ssAttrFuture, ssAttributes -> {
if (ssAttributes != null && !ssAttributes.isEmpty()) {
try {
Map<String, Object> entityData = new HashMap<>();
ObjectNode attributes = mapper.createObjectNode();
for (AttributeKvEntry attr : ssAttributes) {
if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) {
attributes.put(attr.getKey(), attr.getBooleanValue().get());
} else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) {
attributes.put(attr.getKey(), attr.getDoubleValue().get());
} else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) {
attributes.put(attr.getKey(), attr.getLongValue().get());
} else {
attributes.put(attr.getKey(), attr.getValueAsString());
}
entityData.put("kv", attributes);
entityData.put("scope", DataConstants.SERVER_SCOPE);
JsonNode entityBody = mapper.valueToTree(entityData);
log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, entityBody);
saveEdgeEvent(edge.getTenantId(),
edge.getId(),
edgeEventType,
ActionType.ATTRIBUTES_UPDATED,
entityId,
entityBody);
} catch (Exception e) {
log.error("[{}] Failed to send attribute updates to the edge", edge.getName(), e);
}
entityData.put("kv", attributes);
entityData.put("scope", DataConstants.SERVER_SCOPE);
JsonNode entityBody = mapper.valueToTree(entityData);
log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, entityBody);
saveEdgeEvent(edge.getTenantId(),
edge.getId(),
edgeEventType,
ActionType.ATTRIBUTES_UPDATED,
entityId,
entityBody);
} catch (Exception e) {
log.error("[{}] Failed to send attribute updates to the edge", edge.getName(), e);
throw new RuntimeException("[" + edge.getName() + "] Failed to send attribute updates to the edge", e);
}
}
@Override
public void onFailure(Throwable t) {
}
return null;
}, dbCallbackExecutorService);
// TODO: voba - push shared attributes to edge?
ListenableFuture<List<AttributeKvEntry>> shAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, DataConstants.SHARED_SCOPE);
ListenableFuture<List<AttributeKvEntry>> clAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, DataConstants.CLIENT_SCOPE);
// ListenableFuture<List<AttributeKvEntry>> shAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, DataConstants.SHARED_SCOPE);
// ListenableFuture<List<AttributeKvEntry>> clAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, DataConstants.CLIENT_SCOPE);
} else {
return Futures.immediateFuture(null);
}
}
@ -391,7 +389,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
}
@Override
public void processRelationRequestMsg(Edge edge, RelationRequestMsg relationRequestMsg) {
public ListenableFuture<Void> processRelationRequestMsg(Edge edge, RelationRequestMsg relationRequestMsg) {
EntityId entityId = EntityIdFactory.getByTypeAndUuid(
EntityType.valueOf(relationRequestMsg.getEntityType()),
new UUID(relationRequestMsg.getEntityIdMSB(), relationRequestMsg.getEntityIdLSB()));
@ -400,39 +398,33 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
futures.add(findRelationByQuery(edge, entityId, EntitySearchDirection.FROM));
futures.add(findRelationByQuery(edge, entityId, EntitySearchDirection.TO));
ListenableFuture<List<List<EntityRelation>>> relationsListFuture = Futures.allAsList(futures);
Futures.addCallback(relationsListFuture, new FutureCallback<List<List<EntityRelation>>>() {
@Override
public void onSuccess(@Nullable List<List<EntityRelation>> relationsList) {
try {
if (!relationsList.isEmpty()) {
for (List<EntityRelation> entityRelations : relationsList) {
log.trace("[{}] [{}] [{}] relation(s) are going to be pushed to edge.", edge.getId(), entityId, entityRelations.size());
for (EntityRelation relation : entityRelations) {
try {
if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) &&
!relation.getTo().getEntityType().equals(EntityType.EDGE)) {
saveEdgeEvent(edge.getTenantId(),
edge.getId(),
EdgeEventType.RELATION,
ActionType.ADDED,
null,
mapper.valueToTree(relation));
}
} catch (Exception e) {
log.error("Exception during loading relation [{}] to edge on sync!", relation, e);
return Futures.transform(relationsListFuture, relationsList -> {
try {
if (relationsList != null && !relationsList.isEmpty()) {
for (List<EntityRelation> entityRelations : relationsList) {
log.trace("[{}] [{}] [{}] relation(s) are going to be pushed to edge.", edge.getId(), entityId, entityRelations.size());
for (EntityRelation relation : entityRelations) {
try {
if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) &&
!relation.getTo().getEntityType().equals(EntityType.EDGE)) {
saveEdgeEvent(edge.getTenantId(),
edge.getId(),
EdgeEventType.RELATION,
ActionType.ADDED,
null,
mapper.valueToTree(relation));
}
} catch (Exception e) {
log.error("Exception during loading relation [{}] to edge on sync!", relation, e);
}
}
}
} catch (Exception e) {
log.error("Exception during loading relation(s) to edge on sync!", e);
}
} catch (Exception e) {
log.error("Exception during loading relation(s) to edge on sync!", e);
throw new RuntimeException("Exception during loading relation(s) to edge on sync!", e);
}
@Override
public void onFailure(Throwable t) {
log.error("Exception during loading relation(s) to edge on sync!", t);
}
return null;
}, dbCallbackExecutorService);
}
@ -443,22 +435,26 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
}
@Override
public void processDeviceCredentialsRequestMsg(Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg) {
public ListenableFuture<Void> processDeviceCredentialsRequestMsg(Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg) {
if (deviceCredentialsRequestMsg.getDeviceIdMSB() != 0 && deviceCredentialsRequestMsg.getDeviceIdLSB() != 0) {
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsRequestMsg.getDeviceIdMSB(), deviceCredentialsRequestMsg.getDeviceIdLSB()));
saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.DEVICE, ActionType.CREDENTIALS_UPDATED, deviceId, null);
ListenableFuture<EdgeEvent> future = saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.DEVICE, ActionType.CREDENTIALS_UPDATED, deviceId, null);
return Futures.transform(future, edgeEvent -> null, dbCallbackExecutorService);
}
return Futures.immediateFuture(null);
}
@Override
public void processUserCredentialsRequestMsg(Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg) {
public ListenableFuture<Void> processUserCredentialsRequestMsg(Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg) {
if (userCredentialsRequestMsg.getUserIdMSB() != 0 && userCredentialsRequestMsg.getUserIdLSB() != 0) {
UserId userId = new UserId(new UUID(userCredentialsRequestMsg.getUserIdMSB(), userCredentialsRequestMsg.getUserIdLSB()));
saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, ActionType.CREDENTIALS_UPDATED, userId, null);
ListenableFuture<EdgeEvent> future = saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, ActionType.CREDENTIALS_UPDATED, userId, null);
return Futures.transform(future, edgeEvent -> null, dbCallbackExecutorService);
}
return Futures.immediateFuture(null);
}
private void saveEdgeEvent(TenantId tenantId,
private ListenableFuture<EdgeEvent> saveEdgeEvent(TenantId tenantId,
EdgeId edgeId,
EdgeEventType edgeEventType,
ActionType edgeEventAction,
@ -476,6 +472,6 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
edgeEvent.setEntityId(entityId.getId());
}
edgeEvent.setEntityBody(entityBody);
edgeEventService.saveAsync(edgeEvent);
return edgeEventService.saveAsync(edgeEvent);
}
}

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.service.edge.rpc.init;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.gen.edge.AttributesRequestMsg;
import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg;
@ -26,13 +27,13 @@ public interface SyncEdgeService {
void sync(Edge edge);
void processRuleChainMetadataRequestMsg(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg);
ListenableFuture<Void> processRuleChainMetadataRequestMsg(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg);
void processAttributesRequestMsg(Edge edge, AttributesRequestMsg attributesRequestMsg);
ListenableFuture<Void> processAttributesRequestMsg(Edge edge, AttributesRequestMsg attributesRequestMsg);
void processRelationRequestMsg(Edge edge, RelationRequestMsg relationRequestMsg);
ListenableFuture<Void> processRelationRequestMsg(Edge edge, RelationRequestMsg relationRequestMsg);
void processDeviceCredentialsRequestMsg(Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg);
ListenableFuture<Void> processDeviceCredentialsRequestMsg(Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg);
void processUserCredentialsRequestMsg(Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg);
ListenableFuture<Void> processUserCredentialsRequestMsg(Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg);
}

View File

@ -28,9 +28,9 @@ import org.thingsboard.server.gen.edge.ConnectRequestMsg;
import org.thingsboard.server.gen.edge.ConnectResponseCode;
import org.thingsboard.server.gen.edge.ConnectResponseMsg;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.DownlinkResponseMsg;
import org.thingsboard.server.gen.edge.EdgeConfiguration;
import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc;
import org.thingsboard.server.gen.edge.EntityUpdateMsg;
import org.thingsboard.server.gen.edge.RequestMsg;
import org.thingsboard.server.gen.edge.RequestMsgType;
import org.thingsboard.server.gen.edge.ResponseMsg;
@ -41,6 +41,7 @@ import javax.net.ssl.SSLException;
import java.io.File;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@Service
@ -62,12 +63,13 @@ public class EdgeGrpcClient implements EdgeRpcClient {
private StreamObserver<RequestMsg> inputStream;
private static final ReentrantLock uplinkMsgLock = new ReentrantLock();
@Override
public void connect(String edgeKey,
String edgeSecret,
Consumer<UplinkResponseMsg> onUplinkResponse,
Consumer<EdgeConfiguration> onEdgeUpdate,
Consumer<EntityUpdateMsg> onEntityUpdate,
Consumer<DownlinkMsg> onDownlink,
Consumer<Exception> onError) {
NettyChannelBuilder builder = NettyChannelBuilder.forAddress(rpcHost, rpcPort).usePlaintext();
@ -83,7 +85,7 @@ public class EdgeGrpcClient implements EdgeRpcClient {
channel = builder.build();
EdgeRpcServiceGrpc.EdgeRpcServiceStub stub = EdgeRpcServiceGrpc.newStub(channel);
log.info("[{}] Sending a connect request to the TB!", edgeKey);
this.inputStream = stub.handleMsgs(initOutputStream(edgeKey, onUplinkResponse, onEdgeUpdate, onEntityUpdate, onDownlink, onError));
this.inputStream = stub.handleMsgs(initOutputStream(edgeKey, onUplinkResponse, onEdgeUpdate, onDownlink, onError));
this.inputStream.onNext(RequestMsg.newBuilder()
.setMsgType(RequestMsgType.CONNECT_RPC_MESSAGE)
.setConnectRequestMsg(ConnectRequestMsg.newBuilder().setEdgeRoutingKey(edgeKey).setEdgeSecret(edgeSecret).build())
@ -110,16 +112,33 @@ public class EdgeGrpcClient implements EdgeRpcClient {
@Override
public void sendUplinkMsg(UplinkMsg msg) {
this.inputStream.onNext(RequestMsg.newBuilder()
.setMsgType(RequestMsgType.UPLINK_RPC_MESSAGE)
.setUplinkMsg(msg)
.build());
try {
uplinkMsgLock.lock();
this.inputStream.onNext(RequestMsg.newBuilder()
.setMsgType(RequestMsgType.UPLINK_RPC_MESSAGE)
.setUplinkMsg(msg)
.build());
} finally {
uplinkMsgLock.unlock();
}
}
@Override
public void sendDownlinkResponseMsg(DownlinkResponseMsg downlinkResponseMsg) {
try {
uplinkMsgLock.lock();
this.inputStream.onNext(RequestMsg.newBuilder()
.setMsgType(RequestMsgType.UPLINK_RPC_MESSAGE)
.setDownlinkResponseMsg(downlinkResponseMsg)
.build());
} finally {
uplinkMsgLock.unlock();
}
}
private StreamObserver<ResponseMsg> initOutputStream(String edgeKey,
Consumer<UplinkResponseMsg> onUplinkResponse,
Consumer<EdgeConfiguration> onEdgeUpdate,
Consumer<EntityUpdateMsg> onEntityUpdate,
Consumer<DownlinkMsg> onDownlink,
Consumer<Exception> onError) {
return new StreamObserver<ResponseMsg>() {
@ -137,9 +156,6 @@ public class EdgeGrpcClient implements EdgeRpcClient {
} else if (responseMsg.hasUplinkResponseMsg()) {
log.debug("[{}] Uplink response message received {}", edgeKey, responseMsg.getUplinkResponseMsg());
onUplinkResponse.accept(responseMsg.getUplinkResponseMsg());
} else if (responseMsg.hasEntityUpdateMsg()) {
log.debug("[{}] Entity update message received {}", edgeKey, responseMsg.getEntityUpdateMsg());
onEntityUpdate.accept(responseMsg.getEntityUpdateMsg());
} else if (responseMsg.hasDownlinkMsg()) {
log.debug("[{}] Downlink message received {}", edgeKey, responseMsg.getDownlinkMsg());
onDownlink.accept(responseMsg.getDownlinkMsg());

View File

@ -16,8 +16,8 @@
package org.thingsboard.edge.rpc;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.DownlinkResponseMsg;
import org.thingsboard.server.gen.edge.EdgeConfiguration;
import org.thingsboard.server.gen.edge.EntityUpdateMsg;
import org.thingsboard.server.gen.edge.UplinkMsg;
import org.thingsboard.server.gen.edge.UplinkResponseMsg;
@ -29,11 +29,12 @@ public interface EdgeRpcClient {
String integrationSecret,
Consumer<UplinkResponseMsg> onUplinkResponse,
Consumer<EdgeConfiguration> onEdgeUpdate,
Consumer<EntityUpdateMsg> onEntityUpdate,
Consumer<DownlinkMsg> onDownlink,
Consumer<Exception> onError);
void disconnect() throws InterruptedException;
void sendUplinkMsg(UplinkMsg uplinkMsg) throws InterruptedException;
void sendUplinkMsg(UplinkMsg uplinkMsg);
void sendDownlinkResponseMsg(DownlinkResponseMsg downlinkResponseMsg);
}

View File

@ -37,31 +37,13 @@ message RequestMsg {
RequestMsgType msgType = 1;
ConnectRequestMsg connectRequestMsg = 2;
UplinkMsg uplinkMsg = 3;
DownlinkResponseMsg downlinkResponseMsg = 4;
}
message ResponseMsg {
ConnectResponseMsg connectResponseMsg = 1;
UplinkResponseMsg uplinkResponseMsg = 2;
EntityUpdateMsg entityUpdateMsg = 3;
DownlinkMsg downlinkMsg = 4;
}
message EntityUpdateMsg {
DeviceUpdateMsg deviceUpdateMsg = 1;
DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = 2;
RuleChainUpdateMsg ruleChainUpdateMsg = 3;
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 4;
DashboardUpdateMsg dashboardUpdateMsg = 5;
AssetUpdateMsg assetUpdateMsg = 6;
EntityViewUpdateMsg entityViewUpdateMsg = 7;
AlarmUpdateMsg alarmUpdateMsg = 8;
UserUpdateMsg userUpdateMsg = 9;
UserCredentialsUpdateMsg userCredentialsUpdateMsg = 10;
CustomerUpdateMsg customerUpdateMsg = 11;
RelationUpdateMsg relationUpdateMsg = 12;
WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = 13;
WidgetTypeUpdateMsg widgetTypeUpdateMsg = 14;
AdminSettingsUpdateMsg adminSettingsUpdateMsg = 15;
DownlinkMsg downlinkMsg = 3;
}
enum RequestMsgType {
@ -360,9 +342,30 @@ message UplinkResponseMsg {
string errorMsg = 2;
}
message DownlinkResponseMsg {
bool success = 1;
string errorMsg = 2;
}
message DownlinkMsg {
int32 downlinkMsgId = 1;
repeated EntityDataProto entityData = 2;
repeated DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 3;
repeated DeviceUpdateMsg deviceUpdateMsg = 4;
repeated DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = 5;
repeated RuleChainUpdateMsg ruleChainUpdateMsg = 6;
repeated RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 7;
repeated DashboardUpdateMsg dashboardUpdateMsg = 8;
repeated AssetUpdateMsg assetUpdateMsg = 9;
repeated EntityViewUpdateMsg entityViewUpdateMsg = 10;
repeated AlarmUpdateMsg alarmUpdateMsg = 11;
repeated UserUpdateMsg userUpdateMsg = 12;
repeated UserCredentialsUpdateMsg userCredentialsUpdateMsg = 13;
repeated CustomerUpdateMsg customerUpdateMsg = 14;
repeated RelationUpdateMsg relationUpdateMsg = 15;
repeated WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = 16;
repeated WidgetTypeUpdateMsg widgetTypeUpdateMsg = 17;
repeated AdminSettingsUpdateMsg adminSettingsUpdateMsg = 18;
}