diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index b95e1d92f8..10dd1e87ea 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -291,6 +291,9 @@ public class ActorSystemContext { @Getter private long statisticsPersistFrequency; + @Value("${edges.rpc.enabled}") + @Getter + private boolean edgesRpcEnabled; @Scheduled(fixedDelayString = "${actors.statistics.js_print_interval_ms}") public void printStats() { diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 953188f3f7..89379a8bd6 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -87,7 +87,9 @@ public class AppActor extends ContextAwareActor { case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG: + case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: + case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: onToDeviceActorMsg((TenantAwareMsg) msg, true); break; diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java index feecef859f..9662db46c1 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java @@ -17,6 +17,7 @@ package org.thingsboard.server.actors.device; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; +import org.thingsboard.rule.engine.api.msg.DeviceEdgeUpdateMsg; import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; @@ -26,6 +27,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; +import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; @@ -70,12 +72,18 @@ public class DeviceActor extends ContextAwareActor { case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: processor.processRpcRequest(ctx, (ToDeviceRpcRequestActorMsg) msg); break; + case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: + processor.processRpcResponsesFromEdge(ctx, (FromDeviceRpcResponseActorMsg) msg); + break; case DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG: processor.processServerSideRpcTimeout(ctx, (DeviceActorServerSideRpcTimeoutMsg) msg); break; case SESSION_TIMEOUT_MSG: processor.checkSessionsTimeout(); break; + case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG: + processor.processEdgeUpdate((DeviceEdgeUpdateMsg) msg); + break; default: return false; } diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index cf4aa3edda..4589640575 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.actors.device; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -24,17 +25,24 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.thingsboard.rule.engine.api.RpcError; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; +import org.thingsboard.rule.engine.api.msg.DeviceEdgeUpdateMsg; import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKey; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -63,6 +71,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; @@ -98,6 +107,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { private String deviceName; private String deviceType; private TbMsgMetaData defaultMetaData; + private EdgeId edgeId; DeviceActorMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) { super(systemContext); @@ -120,12 +130,27 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { this.defaultMetaData = new TbMsgMetaData(); this.defaultMetaData.putValue("deviceName", deviceName); this.defaultMetaData.putValue("deviceType", deviceType); + if (systemContext.isEdgesRpcEnabled()) { + this.edgeId = findRelatedEdgeId(); + } return true; } else { return false; } } + private EdgeId findRelatedEdgeId() { + List result = + systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.EDGE_TYPE, RelationTypeGroup.COMMON); + if (result != null && result.size() > 0) { + EntityRelation relationToEdge = result.get(0); + if (relationToEdge.getFrom() != null && relationToEdge.getFrom().getId() != null) { + return new EdgeId(relationToEdge.getFrom().getId()); + } + } + return null; + } + void processRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg) { ToDeviceRpcRequest request = msg.getMsg(); ToDeviceRpcRequestBody body = request.getBody(); @@ -138,15 +163,21 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { return; } - boolean sent = rpcSubscriptions.size() > 0; - Set syncSessionSet = new HashSet<>(); - rpcSubscriptions.forEach((key, value) -> { - sendToTransport(rpcRequest, key, value.getNodeId()); - if (SessionType.SYNC == value.getType()) { - syncSessionSet.add(key); - } - }); - syncSessionSet.forEach(rpcSubscriptions::remove); + boolean sent; + if (systemContext.isEdgesRpcEnabled() && edgeId != null) { + saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId()); + sent = true; + } else { + sent = rpcSubscriptions.size() > 0; + Set syncSessionSet = new HashSet<>(); + rpcSubscriptions.forEach((key, value) -> { + sendToTransport(rpcRequest, key, value.getNodeId()); + if (SessionType.SYNC == value.getType()) { + syncSessionSet.add(key); + } + }); + syncSessionSet.forEach(rpcSubscriptions::remove); + } if (request.isOneway() && sent) { log.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId()); @@ -161,6 +192,17 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } } + void processRpcResponsesFromEdge(TbActorCtx context, FromDeviceRpcResponseActorMsg responseMsg) { + log.debug("[{}] Processing rpc command response from edge session", deviceId); + ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); + boolean success = requestMd != null; + if (success) { + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(responseMsg.getMsg()); + } else { + log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId()); + } + } + private void registerPendingRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) { toDeviceRpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent)); DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg(rpcRequest.getRequestId(), timeout); @@ -473,6 +515,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { this.defaultMetaData.putValue("deviceType", deviceType); } + void processEdgeUpdate(DeviceEdgeUpdateMsg msg) { + this.edgeId = msg.getEdgeId(); + } + private void sendToTransport(GetAttributeResponseMsg responseMsg, SessionInfoProto sessionInfo) { ToTransportMsg msg = ToTransportMsg.newBuilder() .setSessionIdMSB(sessionInfo.getSessionIdMSB()) @@ -505,6 +551,25 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { systemContext.getTbCoreToTransportService().process(nodeId, msg); } + private void saveRpcRequestToEdgeQueue(ToDeviceRpcRequest msg, Integer requestId) { + EdgeEvent edgeEvent = new EdgeEvent(); + edgeEvent.setTenantId(tenantId); + edgeEvent.setAction(EdgeEventActionType.RPC_CALL); + edgeEvent.setEntityId(deviceId.getId()); + edgeEvent.setType(EdgeEventType.DEVICE); + + ObjectNode body = mapper.createObjectNode(); + body.put("requestId", requestId); + body.put("requestUUID", msg.getId().toString()); + body.put("oneway", msg.isOneway()); + body.put("expirationTime", msg.getExpirationTime()); + body.put("method", msg.getBody().getMethod()); + body.put("params", msg.getBody().getParams()); + edgeEvent.setBody(body); + + edgeEvent.setEdgeId(edgeId); + systemContext.getEdgeEventService().saveAsync(edgeEvent); + } private List toTsKvProtos(@Nullable List result) { List clientAttributes; diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index fa535a7e9e..0da4769b83 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -148,7 +148,9 @@ public class TenantActor extends RuleChainManagerActor { case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG: + case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: + case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: onToDeviceActorMsg((DeviceAwareMsg) msg, true); break; diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index a86dd65eec..afe69ef0bd 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -223,7 +223,7 @@ public abstract class BaseController { @Value("${edges.rpc.enabled}") @Getter - private boolean edgesSupportEnabled; + private boolean edgesRpcEnabled; @ExceptionHandler(ThingsboardException.class) public void handleThingsboardException(ThingsboardException ex, HttpServletResponse response) { @@ -761,7 +761,7 @@ public abstract class BaseController { } protected void sendNotificationMsgToEdgeService(TenantId tenantId, EdgeId edgeId, CustomerId customerId, EdgeEventActionType action) { - if (!edgesSupportEnabled) { + if (!edgesRpcEnabled) { return; } try { @@ -772,7 +772,7 @@ public abstract class BaseController { } protected void sendNotificationMsgToEdgeService(TenantId tenantId, EntityId entityId, CustomerId customerId, EdgeEventActionType action) { - if (!edgesSupportEnabled) { + if (!edgesRpcEnabled) { return; } EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(entityId.getEntityType()); @@ -786,7 +786,7 @@ public abstract class BaseController { } protected void sendNotificationMsgToEdgeService(TenantId tenantId, EntityRelation relation, EdgeEventActionType action) { - if (!edgesSupportEnabled) { + if (!edgesRpcEnabled) { return; } try { @@ -804,7 +804,7 @@ public abstract class BaseController { } protected void sendNotificationMsgToEdgeService(TenantId tenantId, EdgeId edgeId, EntityId entityId, EdgeEventActionType action) { - if (!edgesSupportEnabled) { + if (!edgesRpcEnabled) { return; } EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(entityId.getEntityType()); diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java index 37b9df8062..e64b3d37d5 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java @@ -32,6 +32,7 @@ import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg; +import org.thingsboard.rule.engine.api.msg.DeviceEdgeUpdateMsg; import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.common.data.ClaimRequest; import org.thingsboard.server.common.data.Customer; @@ -575,6 +576,9 @@ public class DeviceController extends BaseController { Device savedDevice = checkNotNull(deviceService.assignDeviceToEdge(getCurrentUser().getTenantId(), deviceId, edgeId)); + tbClusterService.pushMsgToCore(new DeviceEdgeUpdateMsg(savedDevice.getTenantId(), + savedDevice.getId(), edgeId), null); + logEntityAction(deviceId, savedDevice, savedDevice.getCustomerId(), ActionType.ASSIGNED_TO_EDGE, null, strDeviceId, strEdgeId, edge.getName()); @@ -606,6 +610,9 @@ public class DeviceController extends BaseController { Device savedDevice = checkNotNull(deviceService.unassignDeviceFromEdge(getCurrentUser().getTenantId(), deviceId, edgeId)); + tbClusterService.pushMsgToCore(new DeviceEdgeUpdateMsg(savedDevice.getTenantId(), + savedDevice.getId(), null), null); + logEntityAction(deviceId, device, device.getCustomerId(), ActionType.UNASSIGNED_FROM_EDGE, null, strDeviceId, strEdgeId, edge.getName()); diff --git a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java index c11360827d..47cfb26553 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java @@ -416,7 +416,7 @@ public class EdgeController extends BaseController { public void syncEdge(@RequestBody EdgeId edgeId) throws ThingsboardException { try { edgeId = checkNotNull(edgeId); - if (isEdgesSupportEnabled()) { + if (isEdgesRpcEnabled()) { EdgeGrpcSession session = edgeGrpcService.getEdgeGrpcSessionById(edgeId); Edge edge = session.getEdge(); syncEdgeService.sync(edge); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 4a21c5aac7..384ccafc9c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc; import org.thingsboard.server.gen.edge.RequestMsg; import org.thingsboard.server.gen.edge.ResponseMsg; +import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.edge.EdgeContextComponent; import org.thingsboard.server.service.state.DefaultDeviceStateService; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; @@ -55,6 +56,7 @@ import java.util.concurrent.TimeUnit; @Service @Slf4j @ConditionalOnProperty(prefix = "edges.rpc", value = "enabled", havingValue = "true") +@TbCoreComponent public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService { private final Map sessions = new ConcurrentHashMap<>(); @@ -176,13 +178,15 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i log.trace("No sessions available, sleep for the next run"); try { Thread.sleep(1000); - } catch (InterruptedException ignore) {} + } catch (InterruptedException ignore) { + } } } catch (Exception e) { log.warn("Failed to process messages handling!", e); try { Thread.sleep(1000); - } catch (InterruptedException ignore) {} + } catch (InterruptedException ignore) { + } } } }); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 1dea607dc4..9ae3d4b8d6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -377,7 +377,7 @@ public final class EdgeGrpcSession implements Closeable { private DownlinkMsg processRpcCallMsg(EdgeEvent edgeEvent) { log.trace("Executing processRpcCall, edgeEvent [{}]", edgeEvent); DeviceRpcCallMsg deviceRpcCallMsg = - ctx.getDeviceMsgConstructor().constructDeviceRpcCallMsg(edgeEvent.getBody()); + ctx.getDeviceMsgConstructor().constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody()); return DownlinkMsg.newBuilder() .addAllDeviceRpcCallMsg(Collections.singletonList(deviceRpcCallMsg)) .build(); @@ -398,7 +398,6 @@ public final class EdgeGrpcSession implements Closeable { return downlinkMsg; } - private ListenableFuture getQueueStartTs() { ListenableFuture> future = ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, QUEUE_START_TS_ATTR_KEY); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java index 63f0a55ea3..1f5b360d5b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java @@ -18,7 +18,6 @@ package org.thingsboard.server.service.edge.rpc.constructor; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.stereotype.Component; -import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; @@ -31,6 +30,8 @@ import org.thingsboard.server.gen.edge.RpcRequestMsg; import org.thingsboard.server.gen.edge.UpdateMsgType; import org.thingsboard.server.queue.util.TbCoreComponent; +import java.util.UUID; + @Component @TbCoreComponent public class DeviceMsgConstructor { @@ -78,22 +79,26 @@ public class DeviceMsgConstructor { .setIdLSB(deviceId.getId().getLeastSignificantBits()).build(); } - public DeviceRpcCallMsg constructDeviceRpcCallMsg(JsonNode body) { - RuleEngineDeviceRpcRequest request = mapper.convertValue(body, RuleEngineDeviceRpcRequest.class); + public DeviceRpcCallMsg constructDeviceRpcCallMsg(UUID deviceId, JsonNode body) { + int requestId = body.get("requestId").asInt(); + boolean oneway = body.get("oneway").asBoolean(); + UUID requestUUID = UUID.fromString(body.get("requestUUID").asText()); + long expirationTime = body.get("expirationTime").asLong(); + String method = body.get("method").asText(); + String params = body.get("params").asText(); + RpcRequestMsg.Builder requestBuilder = RpcRequestMsg.newBuilder(); - requestBuilder.setMethod(request.getMethod()); - requestBuilder.setParams(request.getBody()); + requestBuilder.setMethod(method); + requestBuilder.setParams(params); DeviceRpcCallMsg.Builder builder = DeviceRpcCallMsg.newBuilder() - .setDeviceIdMSB(request.getDeviceId().getId().getMostSignificantBits()) - .setDeviceIdLSB(request.getDeviceId().getId().getLeastSignificantBits()) - .setRequestIdMSB(request.getRequestUUID().getMostSignificantBits()) - .setRequestIdLSB(request.getRequestUUID().getLeastSignificantBits()) - .setExpirationTime(request.getExpirationTime()) - .setOneway(request.isOneway()) + .setDeviceIdMSB(deviceId.getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getLeastSignificantBits()) + .setRequestUuidMSB(requestUUID.getMostSignificantBits()) + .setRequestUuidLSB(requestUUID.getLeastSignificantBits()) + .setRequestId(requestId) + .setExpirationTime(expirationTime) + .setOneway(oneway) .setRequestMsg(requestBuilder.build()); - if (request.getOriginServiceId() != null) { - builder.setOriginServiceId(request.getOriginServiceId()); - } return builder.build(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java index 0d9ca2ab4d..ddbeadbcd9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -50,6 +50,7 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; import java.util.UUID; import java.util.concurrent.locks.ReentrantLock; @@ -223,12 +224,14 @@ public class DeviceProcessor extends BaseProcessor { public ListenableFuture processDeviceRpcCallResponseMsg(TenantId tenantId, DeviceRpcCallMsg deviceRpcCallMsg) { log.trace("[{}] processDeviceRpcCallResponseMsg [{}]", tenantId, deviceRpcCallMsg); SettableFuture futureToSet = SettableFuture.create(); - UUID uuid = new UUID(deviceRpcCallMsg.getRequestIdMSB(), deviceRpcCallMsg.getRequestIdLSB()); + UUID requestUuid = new UUID(deviceRpcCallMsg.getRequestUuidMSB(), deviceRpcCallMsg.getRequestUuidLSB()); + DeviceId deviceId = new DeviceId(new UUID(deviceRpcCallMsg.getDeviceIdMSB(), deviceRpcCallMsg.getDeviceIdLSB())); + FromDeviceRpcResponse response; if (!StringUtils.isEmpty(deviceRpcCallMsg.getResponseMsg().getError())) { - response = new FromDeviceRpcResponse(uuid, null, RpcError.valueOf(deviceRpcCallMsg.getResponseMsg().getError())); + response = new FromDeviceRpcResponse(requestUuid, null, RpcError.valueOf(deviceRpcCallMsg.getResponseMsg().getError())); } else { - response = new FromDeviceRpcResponse(uuid, deviceRpcCallMsg.getResponseMsg().getResponse(), null); + response = new FromDeviceRpcResponse(requestUuid, deviceRpcCallMsg.getResponseMsg().getResponse(), null); } TbQueueCallback callback = new TbQueueCallback() { @Override @@ -242,7 +245,11 @@ public class DeviceProcessor extends BaseProcessor { futureToSet.setException(t); } }; - tbClusterService.pushNotificationToCore(deviceRpcCallMsg.getOriginServiceId(), response, callback); + FromDeviceRpcResponseActorMsg msg = + new FromDeviceRpcResponseActorMsg(deviceRpcCallMsg.getRequestId(), + tenantId, + deviceId, response); + tbClusterService.pushMsgToCore(msg, callback); return futureToSet; } diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java b/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java index c1e5e2e038..7f684edc32 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java @@ -20,6 +20,7 @@ import lombok.RequiredArgsConstructor; import lombok.ToString; import org.thingsboard.rule.engine.api.RpcError; +import java.io.Serializable; import java.util.Optional; import java.util.UUID; @@ -28,7 +29,7 @@ import java.util.UUID; */ @RequiredArgsConstructor @ToString -public class FromDeviceRpcResponse { +public class FromDeviceRpcResponse implements Serializable { @Getter private final UUID id; private final String response; diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponseActorMsg.java b/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponseActorMsg.java new file mode 100644 index 0000000000..9db6795251 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponseActorMsg.java @@ -0,0 +1,44 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.rpc; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; +import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; + +@ToString +@RequiredArgsConstructor +public class FromDeviceRpcResponseActorMsg implements ToDeviceActorNotificationMsg { + + @Getter + private final Integer requestId; + @Getter + private final TenantId tenantId; + @Getter + private final DeviceId deviceId; + + @Getter + private final FromDeviceRpcResponse msg; + + @Override + public MsgType getMsgType() { + return MsgType.DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG; + } +} diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java index 5095d32566..854f75a288 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java @@ -1281,7 +1281,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { deviceRpcCallResponseBuilder.setDeviceIdMSB(device.getUuidId().getMostSignificantBits()); deviceRpcCallResponseBuilder.setDeviceIdLSB(device.getUuidId().getLeastSignificantBits()); deviceRpcCallResponseBuilder.setOneway(true); - deviceRpcCallResponseBuilder.setOriginServiceId("originServiceId"); + deviceRpcCallResponseBuilder.setRequestId(0); deviceRpcCallResponseBuilder.setExpirationTime(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10)); RpcResponseMsg.Builder responseBuilder = RpcResponseMsg.newBuilder().setResponse("{}"); diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 4e293740b3..88d93a256a 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -336,11 +336,11 @@ message DeviceCredentialsRequestMsg { message DeviceRpcCallMsg { int64 deviceIdMSB = 1; int64 deviceIdLSB = 2; - int64 requestIdMSB = 3; - int64 requestIdLSB = 4; - int64 expirationTime = 5; - bool oneway = 6; - string originServiceId = 7; + int64 requestUuidMSB = 3; + int64 requestUuidLSB = 4; + int32 requestId = 5; + int64 expirationTime = 6; + bool oneway = 7; RpcRequestMsg requestMsg = 8; RpcResponseMsg responseMsg = 9; } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java index d7f0df9838..83aa6c5108 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java @@ -82,8 +82,12 @@ public enum MsgType { DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG, + DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG, + DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG, + DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG, + SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG, DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG, diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceEdgeUpdateMsg.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceEdgeUpdateMsg.java new file mode 100644 index 0000000000..7ed10548fb --- /dev/null +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceEdgeUpdateMsg.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.api.msg; + +import lombok.AllArgsConstructor; +import lombok.Data; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; + +@Data +@AllArgsConstructor +public class DeviceEdgeUpdateMsg implements ToDeviceActorNotificationMsg { + + private final TenantId tenantId; + private final DeviceId deviceId; + private final EdgeId edgeId; + + @Override + public MsgType getMsgType() { + return MsgType.DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG; + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java index 716474aa37..ef8f6236cb 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java @@ -16,10 +16,6 @@ package org.thingsboard.rule.engine.rpc; import com.datastax.driver.core.utils.UUIDs; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; @@ -36,18 +32,10 @@ import org.thingsboard.rule.engine.api.TbRelationTypes; import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.common.data.edge.EdgeEvent; -import org.thingsboard.server.common.data.edge.EdgeEventActionType; -import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.plugin.ComponentType; -import org.thingsboard.server.common.data.relation.EntityRelation; -import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.msg.TbMsg; -import javax.annotation.Nullable; -import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -66,7 +54,6 @@ import java.util.concurrent.TimeUnit; ) public class TbSendRPCRequestNode implements TbNode { - private static final ObjectMapper json = new ObjectMapper(); private Random random = new Random(); private Gson gson = new Gson(); private JsonParser jsonParser = new JsonParser(); @@ -123,59 +110,19 @@ public class TbSendRPCRequestNode implements TbNode { .restApiCall(restApiCall) .build(); - EdgeId edgeId = findRelatedEdgeId(ctx, msg); - if (edgeId != null) { - sendRpcRequestToEdgeDevice(ctx, msg, edgeId, request); - } else { - ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> { - if (!ruleEngineDeviceRpcResponse.getError().isPresent()) { - TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}")); - ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS); - } else { - TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); - ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name())); - } - }); - } + ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> { + if (!ruleEngineDeviceRpcResponse.getError().isPresent()) { + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}")); + ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS); + } else { + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); + ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name())); + } + }); ctx.ack(msg); } } - private EdgeId findRelatedEdgeId(TbContext ctx, TbMsg msg) { - List result = - ctx.getRelationService().findByToAndType(ctx.getTenantId(), msg.getOriginator(), EntityRelation.EDGE_TYPE, RelationTypeGroup.COMMON); - if (result != null && result.size() > 0) { - EntityRelation relationToEdge = result.get(0); - if (relationToEdge.getFrom() != null && relationToEdge.getFrom().getId() != null) { - return new EdgeId(relationToEdge.getFrom().getId()); - } - } - return null; - } - - private void sendRpcRequestToEdgeDevice(TbContext ctx, TbMsg msg, EdgeId edgeId, RuleEngineDeviceRpcRequest request) { - EdgeEvent edgeEvent = new EdgeEvent(); - edgeEvent.setTenantId(ctx.getTenantId()); - edgeEvent.setAction(EdgeEventActionType.RPC_CALL); - edgeEvent.setEntityId(request.getDeviceId().getId()); - edgeEvent.setType(EdgeEventType.DEVICE); - edgeEvent.setBody(json.valueToTree(request)); - edgeEvent.setEdgeId(edgeId); - ListenableFuture saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent); - Futures.addCallback(saveFuture, new FutureCallback() { - @Override - public void onSuccess(@Nullable EdgeEvent event) { - ctx.tellSuccess(msg); - } - - @Override - public void onFailure(Throwable th) { - log.error("Could not save edge event", th); - ctx.tellFailure(msg, th); - } - }, ctx.getDbCallbackExecutor()); - } - @Override public void destroy() { }