diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 3345f81a34..2f5e20de42 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -23,7 +23,6 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.gson.Gson; -import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import org.thingsboard.server.actors.ActorSystemContext; @@ -47,7 +46,7 @@ import org.thingsboard.server.common.msg.core.AttributesUpdateRequest; import org.thingsboard.server.common.msg.core.BasicCommandAckResponse; import org.thingsboard.server.common.msg.core.BasicGetAttributesResponse; import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse; -import org.thingsboard.server.common.msg.core.BasicToDeviceSessionActorMsg; +import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg; import org.thingsboard.server.common.msg.core.GetAttributesRequest; import org.thingsboard.server.common.msg.core.RuleEngineError; import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg; @@ -57,13 +56,12 @@ import org.thingsboard.server.common.msg.core.SessionOpenMsg; import org.thingsboard.server.common.msg.core.TelemetryUploadRequest; import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg; import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg; -import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg; +import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg; import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg; import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg; import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.common.msg.session.FromDeviceMsg; -import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.msg.session.SessionType; import org.thingsboard.server.common.msg.session.ToDeviceMsg; @@ -74,7 +72,6 @@ import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotific import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse; import org.thingsboard.server.extensions.api.plugins.msg.RpcError; -import org.thingsboard.server.gen.cluster.ClusterAPIProtos; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg; @@ -156,7 +153,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso boolean sent = rpcSubscriptions.size() > 0; Set syncSessionSet = new HashSet<>(); rpcSubscriptions.entrySet().forEach(sub -> { - ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(rpcRequest, sub.getKey()); + ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey()); sendMsgToSessionActor(response, sub.getValue().getServer()); if (SessionType.SYNC == sub.getValue().getType()) { syncSessionSet.add(sub.getKey()); @@ -198,7 +195,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (data != null) { logger.debug("[{}] Queue put [{}] timeout detected!", deviceId, msg.getId()); ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(data.getSessionMsgType(), RuleEngineError.QUEUE_PUT_TIMEOUT); - sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress()); + sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress()); } } @@ -210,7 +207,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso logger.debug("[{}] Queue put [{}] ack detected. Remaining acks: {}!", deviceId, msg.getId(), remainingAcks); if (remainingAcks == 0) { ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId()); - sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress()); + sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress()); } } } @@ -248,7 +245,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso body.getMethod(), body.getParams() ); - ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(rpcRequest, sessionId); + ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sessionId); sendMsgToSessionActor(response, server); }; } @@ -302,14 +299,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso public void onSuccess(@Nullable List> result) { BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(), request.getRequestId(), BasicAttributeKVMsg.from(result.get(0), result.get(1))); - sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress()); + sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress()); } @Override public void onFailure(Throwable t) { if (t instanceof Exception) { ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onError(SessionMsgType.GET_ATTRIBUTES_REQUEST, request.getRequestId(), (Exception) t); - sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress()); + sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress()); } else { logger.error("[{}] Failed to process attributes request", deviceId, t); } @@ -391,14 +388,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (data != null) { logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId()); ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT); - sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer()); + sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer()); } } void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) { ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId()); if (data != null) { - sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer()); + sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer()); } } @@ -409,7 +406,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso pendingMsgs.put(tbMsg.getId(), pendingMsgData); scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout()); } else { - ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId()); + ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId()); sendMsgToSessionActor(response, pendingMsgData.getServerAddress()); } context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self()); @@ -436,7 +433,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (notification != null) { ToDeviceMsg finalNotification = notification; attributeSubscriptions.entrySet().forEach(sub -> { - ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(finalNotification, sub.getKey()); + ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(finalNotification, sub.getKey()); sendMsgToSessionActor(response, sub.getValue().getServer()); }); } @@ -462,7 +459,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso BasicCommandAckResponse response = success ? BasicCommandAckResponse.onSuccess(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId()) : BasicCommandAckResponse.onError(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId(), new TimeoutException()); - sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress()); + sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress()); } } } @@ -517,7 +514,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } } - private void sendMsgToSessionActor(ToDeviceSessionActorMsg response, Optional sessionAddress) { + private void sendMsgToSessionActor(ActorSystemToDeviceSessionActorMsg response, Optional sessionAddress) { if (sessionAddress.isPresent()) { ServerAddress address = sessionAddress.get(); logger.debug("{} Forwarding msg: {}", address, response); @@ -530,7 +527,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso void processCredentialsUpdate() { sessions.forEach((k, v) -> { - sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer()); + sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer()); }); attributeSubscriptions.clear(); rpcSubscriptions.clear(); diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index f8bc94fcf9..85d1b545b3 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -43,6 +43,7 @@ import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.service.script.NashornJsEngine; import scala.concurrent.duration.Duration; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -63,15 +64,24 @@ class DefaultTbContext implements TbContext { @Override public void tellNext(TbMsg msg, String relationType) { - tellNext(msg, relationType, null); + tellNext(msg, Collections.singleton(relationType), null); + } + + @Override + public void tellNext(TbMsg msg, Set relationTypes) { + tellNext(msg, relationTypes, null); } @Override public void tellNext(TbMsg msg, String relationType, Throwable th) { + tellNext(msg, Collections.singleton(relationType), th); + } + + private void tellNext(TbMsg msg, Set relationTypes, Throwable th) { if (nodeCtx.getSelf().isDebugMode()) { - mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th); + relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th)); } - nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationType, msg), nodeCtx.getSelfActor()); + nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg), nodeCtx.getSelfActor()); } @Override @@ -117,12 +127,6 @@ class DefaultTbContext implements TbContext { return nodeCtx.getTenantId(); } - @Override - public void tellNext(TbMsg msg, Set relationTypes) { - //TODO: fix this to send set of relations instead of loop. - relationTypes.forEach(type -> tellNext(msg, type)); - } - @Override public ListeningExecutor getJsExecutor() { return mainCtx.getJsExecutor(); diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java index 4812002ec2..3ba646aaf8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java @@ -54,6 +54,8 @@ public class RuleChainActor extends ComponentActor relations = nodeRoutes.get(originator).stream() - .filter(r -> targetRelationType == null || targetRelationType.equalsIgnoreCase(r.getType())) + .filter(r -> contains(envelope.getRelationTypes(), r.getType())) .collect(Collectors.toList()); TbMsg msg = envelope.getMsg(); @@ -237,6 +236,18 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor relationTypes, String type) { + if (relationTypes == null) { + return true; + } + for (String relationType : relationTypes) { + if (relationType.equalsIgnoreCase(type)) { + return true; + } + } + return false; + } + private void enqueueAndForwardMsgCopyToChain(TbMsg msg, EntityId target, String fromRelationType) { RuleChainId targetRCId = new RuleChainId(target.getId()); TbMsg copyMsg = msg.copy(UUIDs.timeBased(), targetRCId, null, DEFAULT_CLUSTER_PARTITION); diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java index 054284dc59..c0a475c953 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java @@ -21,6 +21,8 @@ import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbMsg; +import java.util.Set; + /** * Created by ashvayka on 19.03.18. */ @@ -28,7 +30,7 @@ import org.thingsboard.server.common.msg.TbMsg; final class RuleNodeToRuleChainTellNextMsg implements TbActorMsg { private final RuleNodeId originator; - private final String relationType; + private final Set relationTypes; private final TbMsg msg; @Override diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java index 1d9c671b6e..3624127194 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java @@ -39,8 +39,12 @@ public abstract class ContextAwareActor extends UntypedActor { logger.debug("Processing msg: {}", msg); } if (msg instanceof TbActorMsg) { - if(!process((TbActorMsg) msg)){ - logger.warning("Unknown message: {}!", msg); + try { + if (!process((TbActorMsg) msg)) { + logger.warning("Unknown message: {}!", msg); + } + } catch (Exception e) { + throw e; } } else { logger.warning("Unknown message: {}!", msg); diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java index 56f3228afe..c80e913ba5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java @@ -211,6 +211,10 @@ public class DefaultActorService implements ActorService { @Override public void onReceivedMsg(ServerAddress source, ClusterAPIProtos.ClusterMessage msg) { ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort()); + log.info("Received msg [{}] from [{}]", msg.getMessageType().name(), serverAddress); + if(log.isDebugEnabled()){ + log.info("MSG: ", msg); + } switch (msg.getMessageType()) { case CLUSTER_ACTOR_MESSAGE: java.util.Optional decodedMsg = actorContext.getEncodingService() diff --git a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java index fa5287f6f2..83d56e0bac 100644 --- a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java @@ -46,7 +46,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor { } @Override - protected void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg) { + protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) { updateSessionCtx(msg, SessionType.ASYNC); if (firstMsg) { toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m)); diff --git a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java index fab51a5eb3..42e127e5d4 100644 --- a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java @@ -45,7 +45,7 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP this.sessionId = sessionId; } - protected abstract void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg); + protected abstract void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg); protected abstract void processTimeoutMsg(ActorContext context, SessionTimeoutMsg msg); @@ -63,12 +63,12 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP protected void cleanupSession(ActorContext ctx) { } - protected void updateSessionCtx(ToDeviceActorSessionMsg msg, SessionType type) { + protected void updateSessionCtx(TransportToDeviceSessionActorMsg msg, SessionType type) { sessionCtx = msg.getSessionMsg().getSessionContext(); deviceToDeviceActorMsgPrototype = new BasicDeviceToDeviceActorMsg(msg, type); } - protected DeviceToDeviceActorMsg toDeviceMsg(ToDeviceActorSessionMsg msg) { + protected DeviceToDeviceActorMsg toDeviceMsg(TransportToDeviceSessionActorMsg msg) { AdaptorToSessionActorMsg adaptorMsg = msg.getSessionMsg(); return new BasicDeviceToDeviceActorMsg(deviceToDeviceActorMsgPrototype, adaptorMsg.getMsg()); } diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java b/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java index 9d324c5b94..f67d46b61b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java @@ -17,7 +17,6 @@ package org.thingsboard.server.actors.session; import akka.actor.OneForOneStrategy; import akka.actor.SupervisorStrategy; -import akka.japi.Function; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.service.ContextAwareActor; import org.thingsboard.server.actors.service.ContextBasedCreator; @@ -25,8 +24,8 @@ import org.thingsboard.server.actors.shared.SessionTimeoutMsg; import org.thingsboard.server.common.data.id.SessionId; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; -import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg; -import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg; +import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg; +import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg; import org.thingsboard.server.common.msg.session.SessionCtrlMsg; import org.thingsboard.server.common.msg.session.SessionMsg; import org.thingsboard.server.common.msg.session.SessionType; @@ -63,38 +62,37 @@ public class SessionActor extends ContextAwareActor { @Override protected boolean process(TbActorMsg msg) { - //TODO Move everything here, to work with TbActorMsg - return false; - } - - @Override - public void onReceive(Object msg) throws Exception { - logger.debug("[{}] Processing: {}.", sessionId, msg); - if (msg instanceof ToDeviceActorSessionMsg) { - processDeviceMsg((ToDeviceActorSessionMsg) msg); - } else if (msg instanceof ToDeviceSessionActorMsg) { - processToDeviceMsg((ToDeviceSessionActorMsg) msg); - } else if (msg instanceof SessionTimeoutMsg) { - processTimeoutMsg((SessionTimeoutMsg) msg); - } else if (msg instanceof SessionCtrlMsg) { - processSessionCtrlMsg((SessionCtrlMsg) msg); - } else if (msg instanceof ClusterEventMsg) { - processClusterEvent((ClusterEventMsg) msg); - } else { - logger.warning("[{}] Unknown msg: {}", sessionId, msg); + switch (msg.getMsgType()) { + case TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG: + processTransportToSessionMsg((TransportToDeviceSessionActorMsg) msg); + break; + case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG: + processActorsToSessionMsg((ActorSystemToDeviceSessionActorMsg) msg); + break; + case SESSION_TIMEOUT_MSG: + processTimeoutMsg((SessionTimeoutMsg) msg); + break; + case SESSION_CTRL_MSG: + processSessionCloseMsg((SessionCtrlMsg) msg); + break; + case CLUSTER_EVENT_MSG: + processClusterEvent((ClusterEventMsg) msg); + break; + default: return false; } + return true; } private void processClusterEvent(ClusterEventMsg msg) { processor.processClusterEvent(context(), msg); } - private void processDeviceMsg(ToDeviceActorSessionMsg msg) { + private void processTransportToSessionMsg(TransportToDeviceSessionActorMsg msg) { initProcessor(msg); processor.processToDeviceActorMsg(context(), msg); } - private void processToDeviceMsg(ToDeviceSessionActorMsg msg) { + private void processActorsToSessionMsg(ActorSystemToDeviceSessionActorMsg msg) { processor.processToDeviceMsg(context(), msg.getMsg()); } @@ -106,7 +104,7 @@ public class SessionActor extends ContextAwareActor { } } - private void processSessionCtrlMsg(SessionCtrlMsg msg) { + private void processSessionCloseMsg(SessionCtrlMsg msg) { if (processor != null) { processor.processSessionCtrlMsg(context(), msg); } else if (msg instanceof SessionCloseMsg) { @@ -116,7 +114,7 @@ public class SessionActor extends ContextAwareActor { } } - private void initProcessor(ToDeviceActorSessionMsg msg) { + private void initProcessor(TransportToDeviceSessionActorMsg msg) { if (processor == null) { SessionMsg sessionMsg = (SessionMsg) msg.getSessionMsg(); if (sessionMsg.getSessionContext().getSessionType() == SessionType.SYNC) { diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java index b5b17917b9..1f8bb6defd 100644 --- a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java @@ -17,7 +17,6 @@ package org.thingsboard.server.actors.session; import java.util.HashMap; import java.util.Map; -import java.util.UUID; import akka.actor.*; import org.thingsboard.server.actors.ActorSystemContext; @@ -33,8 +32,9 @@ import akka.event.Logging; import akka.event.LoggingAdapter; import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; import org.thingsboard.server.common.msg.core.SessionCloseMsg; -import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg; +import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg; import org.thingsboard.server.common.msg.session.SessionCtrlMsg; +import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg; public class SessionManagerActor extends ContextAwareActor { @@ -104,7 +104,7 @@ public class SessionManagerActor extends ContextAwareActor { } private void forwardToSessionActor(SessionAwareMsg msg) { - if (msg instanceof ToDeviceSessionActorMsg || msg instanceof SessionCloseMsg) { + if (msg instanceof ActorSystemToDeviceSessionActorMsg || msg instanceof SessionCloseMsg) { String sessionIdStr = msg.getSessionId().toUidStr(); ActorRef sessionActor = sessionActors.get(sessionIdStr); if (sessionActor != null) { diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java index 6f71d79e06..a0f0a88246 100644 --- a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java @@ -22,7 +22,7 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg; import org.thingsboard.server.common.msg.session.*; -import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg; +import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg; import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; import org.thingsboard.server.common.msg.session.ex.SessionException; @@ -41,7 +41,7 @@ class SyncMsgProcessor extends AbstractSessionActorMsgProcessor { } @Override - protected void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg) { + protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) { updateSessionCtx(msg, SessionType.SYNC); pendingMsg = toDeviceMsg(msg); pendingResponse = true; diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java b/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java index 7d6dbca6cd..d015fe01de 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java @@ -17,13 +17,20 @@ package org.thingsboard.server.actors.shared; import lombok.Data; import org.thingsboard.server.common.data.id.SessionId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.TbActorMsg; import java.io.Serializable; @Data -public class SessionTimeoutMsg implements Serializable { +public class SessionTimeoutMsg implements Serializable, TbActorMsg { private static final long serialVersionUID = 1L; private final SessionId sessionId; + + @Override + public MsgType getMsgType() { + return MsgType.SESSION_TIMEOUT_MSG; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java index 40677976eb..8ab6f99bfc 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java @@ -107,7 +107,7 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D @Override public void onServerAdded(ServerInstance server) { - log.debug("On server added event: {}", server); + log.info("On server added event: {}", server); addNode(server); logCircle(); } @@ -119,7 +119,7 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D @Override public void onServerRemoved(ServerInstance server) { - log.debug("On server removed event: {}", server); + log.info("On server removed event: {}", server); removeNode(server); logCircle(); } diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java index b73a6f5161..923621917a 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java @@ -22,16 +22,11 @@ import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.util.SerializationUtils; import org.thingsboard.server.actors.rpc.RpcBroadcastMsg; import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg; -import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; -import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg; -import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg; -import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription; import org.thingsboard.server.gen.cluster.ClusterAPIProtos; import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc; diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java index 0dc324b335..de29b89df2 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java @@ -19,15 +19,7 @@ import io.grpc.stub.StreamObserver; import org.thingsboard.server.actors.rpc.RpcBroadcastMsg; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; -import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg; -import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg; -import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg; -import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg; -import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg; -import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg; -import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription; import org.thingsboard.server.gen.cluster.ClusterAPIProtos; -import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import java.util.UUID; diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml index 1779912366..978a570e66 100644 --- a/application/src/main/resources/logback.xml +++ b/application/src/main/resources/logback.xml @@ -25,7 +25,7 @@ - + diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java index c7a96db67c..7702788704 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java @@ -101,6 +101,6 @@ public enum MsgType { /** * Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue. */ - RULE_ENGINE_QUEUE_PUT_ACK_MSG; + RULE_ENGINE_QUEUE_PUT_ACK_MSG, ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG, TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG, SESSION_TIMEOUT_MSG, SESSION_CTRL_MSG; } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToDeviceSessionActorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/ActorSystemToDeviceSessionActorMsg.java similarity index 90% rename from common/message/src/main/java/org/thingsboard/server/common/msg/core/ToDeviceSessionActorMsg.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/core/ActorSystemToDeviceSessionActorMsg.java index d190579386..6bb5c9911d 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToDeviceSessionActorMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/ActorSystemToDeviceSessionActorMsg.java @@ -24,7 +24,7 @@ import java.io.Serializable; /** * @author Andrew Shvayka */ -public interface ToDeviceSessionActorMsg extends SessionAwareMsg, Serializable, TbActorMsg { +public interface ActorSystemToDeviceSessionActorMsg extends SessionAwareMsg, Serializable, TbActorMsg { ToDeviceMsg getMsg(); } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicToDeviceSessionActorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicActorSystemToDeviceSessionActorMsg.java similarity index 84% rename from common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicToDeviceSessionActorMsg.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicActorSystemToDeviceSessionActorMsg.java index d01c2b491a..cb931f482b 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicToDeviceSessionActorMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicActorSystemToDeviceSessionActorMsg.java @@ -19,12 +19,12 @@ import org.thingsboard.server.common.data.id.SessionId; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.session.ToDeviceMsg; -public class BasicToDeviceSessionActorMsg implements ToDeviceSessionActorMsg { +public class BasicActorSystemToDeviceSessionActorMsg implements ActorSystemToDeviceSessionActorMsg { private final ToDeviceMsg msg; private final SessionId sessionId; - public BasicToDeviceSessionActorMsg(ToDeviceMsg msg, SessionId sessionId) { + public BasicActorSystemToDeviceSessionActorMsg(ToDeviceMsg msg, SessionId sessionId) { super(); this.msg = msg; this.sessionId = sessionId; @@ -47,6 +47,6 @@ public class BasicToDeviceSessionActorMsg implements ToDeviceSessionActorMsg { @Override public MsgType getMsgType() { - return null; + return MsgType.ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG; } } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java index 1da19cb3cb..92c5105673 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java @@ -24,7 +24,7 @@ import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.msg.session.FromDeviceMsg; import org.thingsboard.server.common.msg.session.SessionType; -import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg; +import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg; import java.util.Optional; @@ -45,7 +45,7 @@ public class BasicDeviceToDeviceActorMsg implements DeviceToDeviceActorMsg { this(null, other.getTenantId(), other.getCustomerId(), other.getDeviceId(), other.getSessionId(), other.getSessionType(), msg); } - public BasicDeviceToDeviceActorMsg(ToDeviceActorSessionMsg msg, SessionType sessionType) { + public BasicDeviceToDeviceActorMsg(TransportToDeviceSessionActorMsg msg, SessionType sessionType) { this(null, msg.getTenantId(), msg.getCustomerId(), msg.getDeviceId(), msg.getSessionId(), sessionType, msg.getSessionMsg().getMsg()); } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/BasicToDeviceActorSessionMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/BasicTransportToDeviceSessionActorMsg.java similarity index 75% rename from common/message/src/main/java/org/thingsboard/server/common/msg/session/BasicToDeviceActorSessionMsg.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/session/BasicTransportToDeviceSessionActorMsg.java index a318538ba8..0704e6e1bf 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/BasicToDeviceActorSessionMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/BasicTransportToDeviceSessionActorMsg.java @@ -20,15 +20,16 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.SessionId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; -public class BasicToDeviceActorSessionMsg implements ToDeviceActorSessionMsg { +public class BasicTransportToDeviceSessionActorMsg implements TransportToDeviceSessionActorMsg { private final TenantId tenantId; private final CustomerId customerId; private final DeviceId deviceId; private final AdaptorToSessionActorMsg msg; - public BasicToDeviceActorSessionMsg(Device device, AdaptorToSessionActorMsg msg) { + public BasicTransportToDeviceSessionActorMsg(Device device, AdaptorToSessionActorMsg msg) { super(); this.tenantId = device.getTenantId(); this.customerId = device.getCustomerId(); @@ -36,13 +37,6 @@ public class BasicToDeviceActorSessionMsg implements ToDeviceActorSessionMsg { this.msg = msg; } - public BasicToDeviceActorSessionMsg(ToDeviceActorSessionMsg deviceMsg) { - this.tenantId = deviceMsg.getTenantId(); - this.customerId = deviceMsg.getCustomerId(); - this.deviceId = deviceMsg.getDeviceId(); - this.msg = deviceMsg.getSessionMsg(); - } - @Override public DeviceId getDeviceId() { return deviceId; @@ -69,8 +63,12 @@ public class BasicToDeviceActorSessionMsg implements ToDeviceActorSessionMsg { @Override public String toString() { - return "BasicToDeviceActorSessionMsg [tenantId=" + tenantId + ", customerId=" + customerId + ", deviceId=" + deviceId + ", msg=" + msg + return "BasicTransportToDeviceSessionActorMsg [tenantId=" + tenantId + ", customerId=" + customerId + ", deviceId=" + deviceId + ", msg=" + msg + "]"; } + @Override + public MsgType getMsgType() { + return MsgType.TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG; + } } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java index 19ca219401..8082f72184 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java @@ -15,8 +15,9 @@ */ package org.thingsboard.server.common.msg.session; +import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.aware.SessionAwareMsg; -public interface SessionCtrlMsg extends SessionAwareMsg { +public interface SessionCtrlMsg extends SessionAwareMsg, TbActorMsg { } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ToDeviceActorSessionMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/TransportToDeviceSessionActorMsg.java similarity index 83% rename from common/message/src/main/java/org/thingsboard/server/common/msg/session/ToDeviceActorSessionMsg.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/session/TransportToDeviceSessionActorMsg.java index 9ecfe19604..deaef3d3e5 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ToDeviceActorSessionMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/TransportToDeviceSessionActorMsg.java @@ -15,12 +15,13 @@ */ package org.thingsboard.server.common.msg.session; +import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.aware.CustomerAwareMsg; import org.thingsboard.server.common.msg.aware.DeviceAwareMsg; import org.thingsboard.server.common.msg.aware.SessionAwareMsg; import org.thingsboard.server.common.msg.aware.TenantAwareMsg; -public interface ToDeviceActorSessionMsg extends DeviceAwareMsg, CustomerAwareMsg, TenantAwareMsg, SessionAwareMsg { +public interface TransportToDeviceSessionActorMsg extends DeviceAwareMsg, CustomerAwareMsg, TenantAwareMsg, SessionAwareMsg, TbActorMsg { AdaptorToSessionActorMsg getSessionMsg(); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java index a24bdcde90..49fad13236 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java @@ -16,6 +16,7 @@ package org.thingsboard.server.common.msg.session.ctrl; import org.thingsboard.server.common.data.id.SessionId; +import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.session.SessionCtrlMsg; public class SessionCloseMsg implements SessionCtrlMsg { @@ -60,4 +61,8 @@ public class SessionCloseMsg implements SessionCtrlMsg { return timeout; } + @Override + public MsgType getMsgType() { + return MsgType.SESSION_CTRL_MSG; + } } diff --git a/pom.xml b/pom.xml index d6a0bef32a..dfccb7d9f5 100755 --- a/pom.xml +++ b/pom.xml @@ -59,7 +59,7 @@ 1.7 2.0 1.4.3 - 2.11.0 + 4.0.1 3.0.2 1.12.0 1.16.18 diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 7674549ba8..bac68ffc58 100644 --- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -38,8 +38,6 @@ import org.thingsboard.server.common.transport.quota.QuotaService; import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; import org.thingsboard.server.transport.coap.session.CoapExchangeObserverProxy; import org.thingsboard.server.transport.coap.session.CoapSessionCtx; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.util.ReflectionUtils; @Slf4j @@ -186,7 +184,7 @@ public class CoapTransportResource extends CoapResource { throw new IllegalArgumentException("Unsupported msg type: " + type); } log.trace("Processing msg: {}", msg); - processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg)); + processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg)); } catch (AdaptorException e) { log.debug("Failed to decode payload {}", e); exchange.respond(ResponseCode.BAD_REQUEST, e.getMessage()); diff --git a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java index 60b22206ac..320f06e7f1 100644 --- a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java +++ b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java @@ -108,8 +108,8 @@ public class CoapServerTest { @Override public void process(SessionAwareMsg toActorMsg) { - if (toActorMsg instanceof ToDeviceActorSessionMsg) { - AdaptorToSessionActorMsg sessionMsg = ((ToDeviceActorSessionMsg) toActorMsg).getSessionMsg(); + if (toActorMsg instanceof TransportToDeviceSessionActorMsg) { + AdaptorToSessionActorMsg sessionMsg = ((TransportToDeviceSessionActorMsg) toActorMsg).getSessionMsg(); try { FromDeviceMsg deviceMsg = sessionMsg.getMsg(); ToDeviceMsg toDeviceMsg = null; diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index 4d90b5f21c..930b442447 100644 --- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.security.DeviceTokenCredentials; import org.thingsboard.server.common.msg.core.*; import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg; import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg; -import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg; +import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg; import org.thingsboard.server.common.msg.session.FromDeviceMsg; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.adaptor.JsonConverter; @@ -219,7 +219,7 @@ public class DeviceApiController { private void process(HttpSessionCtx ctx, FromDeviceMsg request) { AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request); - processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg)); + processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg)); } private boolean quotaExceeded(HttpServletRequest request, DeferredResult responseWriter) { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 5ccce350ac..0b3881723d 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.security.DeviceTokenCredentials; import org.thingsboard.server.common.data.security.DeviceX509Credentials; import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg; -import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg; +import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg; import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.adaptor.AdaptorException; @@ -207,7 +207,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); } if (msg != null) { - processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg)); + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); } else { log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId); ctx.close(); @@ -227,11 +227,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement try { if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); - processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg)); + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); grantedQoSList.add(getMinSupportedQos(reqQoS)); } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) { AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); - processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg)); + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); grantedQoSList.add(getMinSupportedQos(reqQoS)); } else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) { grantedQoSList.add(getMinSupportedQos(reqQoS)); @@ -261,10 +261,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement try { if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); - processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg)); + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) { AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); - processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg)); + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) { deviceSessionCtx.setDisallowAttributeResponses(); } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java index 2056452625..f666bb8a93 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java @@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.msg.core.*; import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg; -import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg; +import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg; import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.adaptor.AdaptorException; @@ -96,8 +96,8 @@ public class GatewaySessionCtx { GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device); devices.put(deviceName, ctx); log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName); - processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg()))); - processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg()))); + processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg()))); + processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg()))); } } @@ -136,7 +136,7 @@ public class GatewaySessionCtx { JsonConverter.parseWithTs(request, element.getAsJsonObject()); } GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); - processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request))); } } else { @@ -152,7 +152,7 @@ public class GatewaySessionCtx { Integer requestId = jsonObj.get("id").getAsInt(); String data = jsonObj.get("data").toString(); GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); - processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data)))); } else { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); @@ -174,7 +174,7 @@ public class GatewaySessionCtx { JsonObject deviceData = deviceEntry.getValue().getAsJsonObject(); request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList())); GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); - processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request))); } } else { @@ -207,7 +207,7 @@ public class GatewaySessionCtx { request = new BasicGetAttributesRequest(requestId, null, keys); } GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); - processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request))); ack(msg); } else {