diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 05902a6146..021f26207a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -41,7 +41,6 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; -import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg; @@ -152,7 +151,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (request.isOneway() && sent) { logger.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId()); - systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(msg.getMsg().getId(), msg.getServerAddress(), null, null)); + systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null)); } else { registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout); } @@ -174,8 +173,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId()); if (requestMd != null) { logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId()); - systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), - requestMd.getMsg().getServerAddress(), null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); + systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), + null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); } } @@ -207,7 +206,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso ToDeviceRpcRequestBody body = request.getBody(); if (request.isOneway()) { sentOneWayIds.add(entry.getKey()); - systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(request.getId(), requestActorMsg.getServerAddress(), null, null)); + systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(request.getId(), null, null)); } ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder().setRequestId( entry.getKey()).setMethodName(body.getMethod()).setParams(body.getParams()).build(); @@ -400,8 +399,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); boolean success = requestMd != null; if (success) { - systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), - requestMd.getMsg().getServerAddress(), responseMsg.getPayload(), null)); + systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), + responseMsg.getPayload(), null)); } else { logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId()); } diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 4e779d452b..0baeceabc3 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -17,6 +17,7 @@ package org.thingsboard.server.actors.ruleChain; import akka.actor.ActorRef; import com.datastax.driver.core.utils.UUIDs; +import org.springframework.util.StringUtils; import org.thingsboard.rule.engine.api.ListeningExecutor; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest; @@ -35,6 +36,8 @@ import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.cluster.ServerAddress; +import org.thingsboard.server.common.msg.cluster.ServerType; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetService; @@ -232,16 +235,22 @@ class DefaultTbContext implements TbContext { return new RuleEngineRpcService() { @Override public void sendRpcReply(DeviceId deviceId, int requestId, String body) { - mainCtx.getDeviceRpcService().sendRpcReplyToDevice(nodeCtx.getTenantId(), deviceId, requestId, body); + mainCtx.getDeviceRpcService().sendReplyToRpcCallFromDevice(nodeCtx.getTenantId(), deviceId, requestId, body); } @Override public void sendRpcRequest(RuleEngineDeviceRpcRequest src, Consumer consumer) { ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), nodeCtx.getTenantId(), src.getDeviceId(), src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody())); - mainCtx.getDeviceRpcService().processRpcRequestToDevice(request, response -> { + mainCtx.getDeviceRpcService().forwardServerSideRPCRequestToDeviceActor(request, response -> { if (src.isRestApiCall()) { - mainCtx.getDeviceRpcService().processRestAPIRpcResponseFromRuleEngine(response); + ServerAddress requestOriginAddress; + if (!StringUtils.isEmpty(src.getOriginHost())) { + requestOriginAddress = new ServerAddress(src.getOriginHost(), src.getOriginPort(), ServerType.CORE); + } else { + requestOriginAddress = mainCtx.getRoutingService().getCurrentServer(); + } + mainCtx.getDeviceRpcService().processResponseToServerSideRPCRequestFromRuleEngine(requestOriginAddress, response); } consumer.accept(RuleEngineDeviceRpcResponse.builder() .deviceId(src.getDeviceId()) diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java index f7e80e4baa..08d1dd8a11 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java @@ -35,5 +35,4 @@ public interface ActorService extends SessionMsgProcessor, RpcMsgListener, Disco void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType); - void onMsg(ServiceToRuleEngineMsg serviceToRuleEngineMsg); } diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java index 542a82f160..85b8943569 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java @@ -42,7 +42,6 @@ import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; -import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; import org.thingsboard.server.gen.cluster.ClusterAPIProtos; import org.thingsboard.server.service.cluster.discovery.DiscoveryService; import org.thingsboard.server.service.cluster.discovery.ServerInstance; @@ -159,11 +158,6 @@ public class DefaultActorService implements ActorService { appActor.tell(new SendToClusterMsg(deviceId, msg), ActorRef.noSender()); } - @Override - public void onMsg(ServiceToRuleEngineMsg msg) { - appActor.tell(msg, ActorRef.noSender()); - } - public void broadcast(ToAllNodesMsg msg) { actorContext.getEncodingService().encode(msg); rpcService.broadcast(new RpcBroadcastMsg(ClusterAPIProtos.ClusterMessage @@ -185,7 +179,7 @@ public class DefaultActorService implements ActorService { ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort(), source.getServerType()); if (log.isDebugEnabled()) { log.info("Received msg [{}] from [{}]", msg.getMessageType().name(), serverAddress); - log.info("MSG: ", msg); + log.info("MSG: {}", msg); } switch (msg.getMessageType()) { case CLUSTER_ACTOR_MESSAGE: @@ -219,7 +213,7 @@ public class DefaultActorService implements ActorService { actorContext.getTsSubService().onRemoteTsUpdate(serverAddress, msg.getPayload().toByteArray()); break; case CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE: - actorContext.getDeviceRpcService().processRemoteResponseFromDevice(serverAddress, msg.getPayload().toByteArray()); + actorContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromRemoteServer(serverAddress, msg.getPayload().toByteArray()); break; case CLUSTER_DEVICE_STATE_SERVICE_MESSAGE: actorContext.getDeviceStateService().onRemoteMsg(serverAddress, msg.getPayload().toByteArray()); diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index 582bbedebd..be1b6db1d9 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.widget.WidgetsBundle; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetService; @@ -673,7 +674,7 @@ public abstract class BaseController { TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, entityId, metaData, TbMsgDataType.JSON , json.writeValueAsString(entityNode) , null, null, 0L); - actorService.onMsg(new ServiceToRuleEngineMsg(user.getTenantId(), tbMsg)); + actorService.onMsg(new SendToClusterMsg(entityId, new ServiceToRuleEngineMsg(user.getTenantId(), tbMsg))); } catch (Exception e) { log.warn("[{}] Failed to push entity action to rule engine: {}", entityId, actionType, e); } diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java index e13f8656d3..a65c1a5771 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java @@ -90,7 +90,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService { @Override public void processRestAPIRpcRequestToRuleEngine(ToDeviceRpcRequest request, Consumer responseConsumer) { - log.trace("[{}][{}] Processing local rpc call to rule engine [{}]", request.getTenantId(), request.getId(), request.getDeviceId()); + log.trace("[{}][{}] Processing REST API call to rule engine [{}]", request.getTenantId(), request.getId(), request.getDeviceId()); UUID requestId = request.getId(); localToRuleEngineRpcRequests.put(requestId, responseConsumer); sendRpcRequestToRuleEngine(request); @@ -98,31 +98,11 @@ public class DefaultDeviceRpcService implements DeviceRpcService { } @Override - public void processRestAPIRpcResponseFromRuleEngine(FromDeviceRpcResponse response) { - UUID requestId = response.getId(); - Consumer consumer = localToRuleEngineRpcRequests.remove(requestId); - if (consumer != null) { - consumer.accept(response); - } else { - log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response); - } - } - - @Override - public void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer responseConsumer) { - log.trace("[{}][{}] Processing local rpc call to device [{}]", request.getTenantId(), request.getId(), request.getDeviceId()); - UUID requestId = request.getId(); - localToDeviceRpcRequests.put(requestId, responseConsumer); - sendRpcRequestToDevice(request); - scheduleTimeout(request, requestId, localToDeviceRpcRequests); - } - - @Override - public void processRpcResponseFromDevice(FromDeviceRpcResponse response) { - log.trace("[{}] Received device RPC response from server: [{}]", response.getId(), response.getServerAddress()); - if (routingService.getCurrentServer().equals(response.getServerAddress())) { + public void processResponseToServerSideRPCRequestFromRuleEngine(ServerAddress requestOriginAddress, FromDeviceRpcResponse response) { + log.trace("[{}] Received response to server-side RPC request from rule engine: [{}]", response.getId(), requestOriginAddress); + if (routingService.getCurrentServer().equals(requestOriginAddress)) { UUID requestId = response.getId(); - Consumer consumer = localToDeviceRpcRequests.remove(requestId); + Consumer consumer = localToRuleEngineRpcRequests.remove(requestId); if (consumer != null) { consumer.accept(response); } else { @@ -138,12 +118,33 @@ public class DefaultDeviceRpcService implements DeviceRpcService { } else { builder.setError(-1); } - rpcService.tell(response.getServerAddress(), ClusterAPIProtos.MessageType.CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE, builder.build().toByteArray()); + rpcService.tell(requestOriginAddress, ClusterAPIProtos.MessageType.CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE, builder.build().toByteArray()); } } @Override - public void processRemoteResponseFromDevice(ServerAddress serverAddress, byte[] data) { + public void forwardServerSideRPCRequestToDeviceActor(ToDeviceRpcRequest request, Consumer responseConsumer) { + log.trace("[{}][{}] Processing local rpc call to device actor [{}]", request.getTenantId(), request.getId(), request.getDeviceId()); + UUID requestId = request.getId(); + localToDeviceRpcRequests.put(requestId, responseConsumer); + sendRpcRequestToDevice(request); + scheduleTimeout(request, requestId, localToDeviceRpcRequests); + } + + @Override + public void processResponseToServerSideRPCRequestFromDeviceActor(FromDeviceRpcResponse response) { + log.trace("[{}] Received response to server-side RPC request from device actor.", response.getId()); + UUID requestId = response.getId(); + Consumer consumer = localToDeviceRpcRequests.remove(requestId); + if (consumer != null) { + consumer.accept(response); + } else { + log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response); + } + } + + @Override + public void processResponseToServerSideRPCRequestFromRemoteServer(ServerAddress serverAddress, byte[] data) { ClusterAPIProtos.FromDeviceRPCResponseProto proto; try { proto = ClusterAPIProtos.FromDeviceRPCResponseProto.parseFrom(data); @@ -151,13 +152,12 @@ public class DefaultDeviceRpcService implements DeviceRpcService { throw new RuntimeException(e); } RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null; - FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()), serverAddress, - proto.getResponse(), error); - processRpcResponseFromDevice(response); + FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()), proto.getResponse(), error); + processResponseToServerSideRPCRequestFromRuleEngine(routingService.getCurrentServer(), response); } @Override - public void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body) { + public void sendReplyToRpcCallFromDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body) { ToServerRpcResponseActorMsg rpcMsg = new ToServerRpcResponseActorMsg(tenantId, deviceId, new ToServerRpcResponseMsg(requestId, body)); forward(deviceId, rpcMsg); } @@ -166,6 +166,8 @@ public class DefaultDeviceRpcService implements DeviceRpcService { ObjectNode entityNode = json.createObjectNode(); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("requestUUID", msg.getId().toString()); + metaData.putValue("originHost", routingService.getCurrentServer().getHost()); + metaData.putValue("originPort", Integer.toString(routingService.getCurrentServer().getPort())); metaData.putValue("expirationTime", Long.toString(msg.getExpirationTime())); metaData.putValue("oneway", Boolean.toString(msg.isOneway())); @@ -176,7 +178,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService { TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE, msg.getDeviceId(), metaData, TbMsgDataType.JSON , json.writeValueAsString(entityNode) , null, null, 0L); - actorService.onMsg(new ServiceToRuleEngineMsg(msg.getTenantId(), tbMsg)); + actorService.onMsg(new SendToClusterMsg(msg.getDeviceId(), new ServiceToRuleEngineMsg(msg.getTenantId(), tbMsg))); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -199,7 +201,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService { log.trace("[{}] timeout the request: [{}]", this.hashCode(), requestId); Consumer consumer = requestsMap.remove(requestId); if (consumer != null) { - consumer.accept(new FromDeviceRpcResponse(requestId, null, null, RpcError.TIMEOUT)); + consumer.accept(new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT)); } }, timeout, TimeUnit.MILLISECONDS); } diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java index 4cee96ee71..bf12ec664c 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java @@ -29,13 +29,13 @@ public interface DeviceRpcService { void processRestAPIRpcRequestToRuleEngine(ToDeviceRpcRequest request, Consumer responseConsumer); - void processRestAPIRpcResponseFromRuleEngine(FromDeviceRpcResponse response); + void processResponseToServerSideRPCRequestFromRuleEngine(ServerAddress requestOriginAddress, FromDeviceRpcResponse response); - void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer responseConsumer); + void forwardServerSideRPCRequestToDeviceActor(ToDeviceRpcRequest request, Consumer responseConsumer); - void processRpcResponseFromDevice(FromDeviceRpcResponse response); + void processResponseToServerSideRPCRequestFromDeviceActor(FromDeviceRpcResponse response); - void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body); + void processResponseToServerSideRPCRequestFromRemoteServer(ServerAddress serverAddress, byte[] data); - void processRemoteResponseFromDevice(ServerAddress serverAddress, byte[] bytes); + void sendReplyToRpcCallFromDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body); } diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java b/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java index 9c3ce9a834..75506dfacc 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java @@ -32,8 +32,6 @@ import java.util.UUID; public class FromDeviceRpcResponse { @Getter private final UUID id; - @Getter - private final ServerAddress serverAddress; private final String response; private final RpcError error; diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index f4e37db971..610cd3b72a 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -43,6 +43,7 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; import org.thingsboard.server.dao.attributes.AttributesService; @@ -457,7 +458,7 @@ public class DefaultDeviceStateService implements DeviceStateService { TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, stateData.getDeviceId(), stateData.getMetaData().copy(), TbMsgDataType.JSON , json.writeValueAsString(state) , null, null, 0L); - actorService.onMsg(new ServiceToRuleEngineMsg(stateData.getTenantId(), tbMsg)); + actorService.onMsg(new SendToClusterMsg(stateData.getDeviceId(), new ServiceToRuleEngineMsg(stateData.getTenantId(), tbMsg))); } catch (Exception e) { log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e); } diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto index 21c963b15e..1940b36c12 100644 --- a/application/src/main/proto/cluster.proto +++ b/application/src/main/proto/cluster.proto @@ -22,6 +22,7 @@ option java_outer_classname = "ClusterAPIProtos"; service ClusterRpcService { rpc handleMsgs(stream ClusterMessage) returns (stream ClusterMessage) {} } + message ClusterMessage { MessageType messageType = 1; MessageMataInfo messageMetaInfo = 2; @@ -139,4 +140,4 @@ message DeviceStateServiceMsgProto { bool added = 5; bool updated = 6; bool deleted = 7; -} \ No newline at end of file +} diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java index f050f3b505..1de2787af6 100644 --- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java @@ -38,6 +38,7 @@ import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; import org.thingsboard.server.controller.AbstractRuleEngineControllerTest; import org.thingsboard.server.dao.attributes.AttributesService; @@ -155,7 +156,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule device.getId(), new TbMsgMetaData(), "{}", null, null, 0L); - actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)); + actorService.onMsg(new SendToClusterMsg(device.getId(), new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg))); Thread.sleep(3000); @@ -270,7 +271,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule device.getId(), new TbMsgMetaData(), "{}", null, null, 0L); - actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)); + actorService.onMsg(new SendToClusterMsg(device.getId(), new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg))); Thread.sleep(3000); diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java index 24db4570bd..f59dd63614 100644 --- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java @@ -39,6 +39,7 @@ import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; import org.thingsboard.server.controller.AbstractRuleEngineControllerTest; import org.thingsboard.server.dao.attributes.AttributesService; @@ -142,7 +143,7 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac new TbMsgMetaData(), "{}", null, null, 0L); - actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)); + actorService.onMsg(new SendToClusterMsg(device.getId(), new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg))); Thread.sleep(3000); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java index 0792b63c28..7859623d52 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java @@ -21,11 +21,13 @@ import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbMsg; +import java.io.Serializable; + /** * Created by ashvayka on 15.03.18. */ @Data -public final class ServiceToRuleEngineMsg implements TbActorMsg { +public final class ServiceToRuleEngineMsg implements TbActorMsg, Serializable { private final TenantId tenantId; private final TbMsg tbMsg; diff --git a/common/transport/transport-api/src/main/proto/transport.proto b/common/transport/transport-api/src/main/proto/transport.proto index 0e36dabbf3..ff740d4ce4 100644 --- a/common/transport/transport-api/src/main/proto/transport.proto +++ b/common/transport/transport-api/src/main/proto/transport.proto @@ -172,6 +172,22 @@ message ToServerRpcResponseMsg { string error = 3; } +//Used to report session state to tb-node and persist this state in the cache on the tb-node level. +message SubscriptionInfoProto { + int64 lastActivityTime = 1; + bool attributeSubscription = 2; + bool rpcSubscription = 3; +} + +message SessionSubscriptionInfoProto { + SessionInfoProto sessionInfo = 1; + SubscriptionInfoProto subscriptionInfo = 2; +} + +message DeviceSessionsCacheEntry { + repeated SessionSubscriptionInfoProto sessions = 1; +} + message TransportToDeviceActorMsg { SessionInfoProto sessionInfo = 1; SessionEventMsg sessionEvent = 2; @@ -182,6 +198,7 @@ message TransportToDeviceActorMsg { SubscribeToRPCMsg subscribeToRPC = 7; ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 8; ToServerRpcRequestMsg toServerRPCCallRequest = 9; + SubscriptionInfoProto subscriptionInfo = 10; } message DeviceActorToTransportMsg { @@ -214,4 +231,4 @@ message TransportApiRequestMsg { message TransportApiResponseMsg { ValidateDeviceCredentialsResponseMsg validateTokenResponseMsg = 1; GetOrCreateDeviceFromGatewayResponseMsg getOrCreateDeviceResponseMsg = 2; -} \ No newline at end of file +} diff --git a/msa/pom.xml b/msa/pom.xml index 5fd2212e1e..d6d6c8d6b3 100644 --- a/msa/pom.xml +++ b/msa/pom.xml @@ -23,7 +23,6 @@ 2.2.0-SNAPSHOT thingsboard - org.thingsboard msa pom diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java index 9a93d1e281..f3d57de602 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java @@ -31,6 +31,8 @@ public final class RuleEngineDeviceRpcRequest { private final DeviceId deviceId; private final int requestId; private final UUID requestUUID; + private final String originHost; + private final int originPort; private final boolean oneway; private final String method; private final String body; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java index 5d7e1241f7..8c95b887d2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java @@ -86,6 +86,10 @@ public class TbSendRPCRequestNode implements TbNode { tmp = msg.getMetaData().getValue("requestUUID"); UUID requestUUID = !StringUtils.isEmpty(tmp) ? UUID.fromString(tmp) : UUIDs.timeBased(); + tmp = msg.getMetaData().getValue("originHost"); + String originHost = !StringUtils.isEmpty(tmp) ? tmp : null; + tmp = msg.getMetaData().getValue("originPort"); + int originPort = !StringUtils.isEmpty(tmp) ? Integer.parseInt(tmp) : 0; tmp = msg.getMetaData().getValue("expirationTime"); long expirationTime = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : (System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds())); @@ -105,6 +109,8 @@ public class TbSendRPCRequestNode implements TbNode { .deviceId(new DeviceId(msg.getOriginator().getId())) .requestId(requestId) .requestUUID(requestUUID) + .originHost(originHost) + .originPort(originPort) .expirationTime(expirationTime) .restApiCall(restApiCall) .build();