Improvements to Cluster mode

This commit is contained in:
Andrew Shvayka 2018-05-17 09:13:06 +03:00
parent e834dd3689
commit 36f4fb03a4
31 changed files with 143 additions and 124 deletions

View File

@ -23,7 +23,6 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.thingsboard.server.actors.ActorSystemContext;
@ -47,7 +46,7 @@ import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
import org.thingsboard.server.common.msg.core.BasicCommandAckResponse;
import org.thingsboard.server.common.msg.core.BasicGetAttributesResponse;
import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
import org.thingsboard.server.common.msg.core.BasicToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.core.GetAttributesRequest;
import org.thingsboard.server.common.msg.core.RuleEngineError;
import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg;
@ -57,13 +56,12 @@ import org.thingsboard.server.common.msg.core.SessionOpenMsg;
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.msg.session.SessionType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
@ -74,7 +72,6 @@ import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotific
import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
@ -156,7 +153,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
boolean sent = rpcSubscriptions.size() > 0;
Set<SessionId> syncSessionSet = new HashSet<>();
rpcSubscriptions.entrySet().forEach(sub -> {
ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(rpcRequest, sub.getKey());
ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey());
sendMsgToSessionActor(response, sub.getValue().getServer());
if (SessionType.SYNC == sub.getValue().getType()) {
syncSessionSet.add(sub.getKey());
@ -198,7 +195,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (data != null) {
logger.debug("[{}] Queue put [{}] timeout detected!", deviceId, msg.getId());
ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(data.getSessionMsgType(), RuleEngineError.QUEUE_PUT_TIMEOUT);
sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
}
}
@ -210,7 +207,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
logger.debug("[{}] Queue put [{}] ack detected. Remaining acks: {}!", deviceId, msg.getId(), remainingAcks);
if (remainingAcks == 0) {
ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId());
sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
}
}
}
@ -248,7 +245,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
body.getMethod(),
body.getParams()
);
ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(rpcRequest, sessionId);
ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sessionId);
sendMsgToSessionActor(response, server);
};
}
@ -302,14 +299,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {
BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
request.getRequestId(), BasicAttributeKVMsg.from(result.get(0), result.get(1)));
sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
}
@Override
public void onFailure(Throwable t) {
if (t instanceof Exception) {
ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onError(SessionMsgType.GET_ATTRIBUTES_REQUEST, request.getRequestId(), (Exception) t);
sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
} else {
logger.error("[{}] Failed to process attributes request", deviceId, t);
}
@ -391,14 +388,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (data != null) {
logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT);
sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
}
}
void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) {
ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId());
if (data != null) {
sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
}
}
@ -409,7 +406,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
pendingMsgs.put(tbMsg.getId(), pendingMsgData);
scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout());
} else {
ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId());
ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId());
sendMsgToSessionActor(response, pendingMsgData.getServerAddress());
}
context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self());
@ -436,7 +433,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (notification != null) {
ToDeviceMsg finalNotification = notification;
attributeSubscriptions.entrySet().forEach(sub -> {
ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(finalNotification, sub.getKey());
ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(finalNotification, sub.getKey());
sendMsgToSessionActor(response, sub.getValue().getServer());
});
}
@ -462,7 +459,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
BasicCommandAckResponse response = success
? BasicCommandAckResponse.onSuccess(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId())
: BasicCommandAckResponse.onError(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId(), new TimeoutException());
sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress());
sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress());
}
}
}
@ -517,7 +514,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
private void sendMsgToSessionActor(ToDeviceSessionActorMsg response, Optional<ServerAddress> sessionAddress) {
private void sendMsgToSessionActor(ActorSystemToDeviceSessionActorMsg response, Optional<ServerAddress> sessionAddress) {
if (sessionAddress.isPresent()) {
ServerAddress address = sessionAddress.get();
logger.debug("{} Forwarding msg: {}", address, response);
@ -530,7 +527,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
void processCredentialsUpdate() {
sessions.forEach((k, v) -> {
sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer());
sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer());
});
attributeSubscriptions.clear();
rpcSubscriptions.clear();

View File

@ -43,6 +43,7 @@ import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.service.script.NashornJsEngine;
import scala.concurrent.duration.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -63,15 +64,24 @@ class DefaultTbContext implements TbContext {
@Override
public void tellNext(TbMsg msg, String relationType) {
tellNext(msg, relationType, null);
tellNext(msg, Collections.singleton(relationType), null);
}
@Override
public void tellNext(TbMsg msg, Set<String> relationTypes) {
tellNext(msg, relationTypes, null);
}
@Override
public void tellNext(TbMsg msg, String relationType, Throwable th) {
tellNext(msg, Collections.singleton(relationType), th);
}
private void tellNext(TbMsg msg, Set<String> relationTypes, Throwable th) {
if (nodeCtx.getSelf().isDebugMode()) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th);
relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
}
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationType, msg), nodeCtx.getSelfActor());
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg), nodeCtx.getSelfActor());
}
@Override
@ -117,12 +127,6 @@ class DefaultTbContext implements TbContext {
return nodeCtx.getTenantId();
}
@Override
public void tellNext(TbMsg msg, Set<String> relationTypes) {
//TODO: fix this to send set of relations instead of loop.
relationTypes.forEach(type -> tellNext(msg, type));
}
@Override
public ListeningExecutor getJsExecutor() {
return mainCtx.getJsExecutor();

View File

@ -54,6 +54,8 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
case RULE_CHAIN_TO_RULE_CHAIN_MSG:
processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg);
break;
case CLUSTER_EVENT_MSG:
break;
default:
return false;
}

View File

@ -206,9 +206,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
checkActive();
RuleNodeId originator = envelope.getOriginator();
String targetRelationType = envelope.getRelationType();
List<RuleNodeRelation> relations = nodeRoutes.get(originator).stream()
.filter(r -> targetRelationType == null || targetRelationType.equalsIgnoreCase(r.getType()))
.filter(r -> contains(envelope.getRelationTypes(), r.getType()))
.collect(Collectors.toList());
TbMsg msg = envelope.getMsg();
@ -237,6 +236,18 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
}
private boolean contains(Set<String> relationTypes, String type) {
if (relationTypes == null) {
return true;
}
for (String relationType : relationTypes) {
if (relationType.equalsIgnoreCase(type)) {
return true;
}
}
return false;
}
private void enqueueAndForwardMsgCopyToChain(TbMsg msg, EntityId target, String fromRelationType) {
RuleChainId targetRCId = new RuleChainId(target.getId());
TbMsg copyMsg = msg.copy(UUIDs.timeBased(), targetRCId, null, DEFAULT_CLUSTER_PARTITION);

View File

@ -21,6 +21,8 @@ import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.Set;
/**
* Created by ashvayka on 19.03.18.
*/
@ -28,7 +30,7 @@ import org.thingsboard.server.common.msg.TbMsg;
final class RuleNodeToRuleChainTellNextMsg implements TbActorMsg {
private final RuleNodeId originator;
private final String relationType;
private final Set<String> relationTypes;
private final TbMsg msg;
@Override

View File

@ -39,8 +39,12 @@ public abstract class ContextAwareActor extends UntypedActor {
logger.debug("Processing msg: {}", msg);
}
if (msg instanceof TbActorMsg) {
if(!process((TbActorMsg) msg)){
logger.warning("Unknown message: {}!", msg);
try {
if (!process((TbActorMsg) msg)) {
logger.warning("Unknown message: {}!", msg);
}
} catch (Exception e) {
throw e;
}
} else {
logger.warning("Unknown message: {}!", msg);

View File

@ -211,6 +211,10 @@ public class DefaultActorService implements ActorService {
@Override
public void onReceivedMsg(ServerAddress source, ClusterAPIProtos.ClusterMessage msg) {
ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort());
log.info("Received msg [{}] from [{}]", msg.getMessageType().name(), serverAddress);
if(log.isDebugEnabled()){
log.info("MSG: ", msg);
}
switch (msg.getMessageType()) {
case CLUSTER_ACTOR_MESSAGE:
java.util.Optional<TbActorMsg> decodedMsg = actorContext.getEncodingService()

View File

@ -46,7 +46,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
}
@Override
protected void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg) {
protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) {
updateSessionCtx(msg, SessionType.ASYNC);
if (firstMsg) {
toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m));

View File

@ -45,7 +45,7 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP
this.sessionId = sessionId;
}
protected abstract void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg);
protected abstract void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg);
protected abstract void processTimeoutMsg(ActorContext context, SessionTimeoutMsg msg);
@ -63,12 +63,12 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP
protected void cleanupSession(ActorContext ctx) {
}
protected void updateSessionCtx(ToDeviceActorSessionMsg msg, SessionType type) {
protected void updateSessionCtx(TransportToDeviceSessionActorMsg msg, SessionType type) {
sessionCtx = msg.getSessionMsg().getSessionContext();
deviceToDeviceActorMsgPrototype = new BasicDeviceToDeviceActorMsg(msg, type);
}
protected DeviceToDeviceActorMsg toDeviceMsg(ToDeviceActorSessionMsg msg) {
protected DeviceToDeviceActorMsg toDeviceMsg(TransportToDeviceSessionActorMsg msg) {
AdaptorToSessionActorMsg adaptorMsg = msg.getSessionMsg();
return new BasicDeviceToDeviceActorMsg(deviceToDeviceActorMsgPrototype, adaptorMsg.getMsg());
}

View File

@ -17,7 +17,6 @@ package org.thingsboard.server.actors.session;
import akka.actor.OneForOneStrategy;
import akka.actor.SupervisorStrategy;
import akka.japi.Function;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
@ -25,8 +24,8 @@ import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg;
import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
import org.thingsboard.server.common.msg.session.SessionMsg;
import org.thingsboard.server.common.msg.session.SessionType;
@ -63,38 +62,37 @@ public class SessionActor extends ContextAwareActor {
@Override
protected boolean process(TbActorMsg msg) {
//TODO Move everything here, to work with TbActorMsg
return false;
}
@Override
public void onReceive(Object msg) throws Exception {
logger.debug("[{}] Processing: {}.", sessionId, msg);
if (msg instanceof ToDeviceActorSessionMsg) {
processDeviceMsg((ToDeviceActorSessionMsg) msg);
} else if (msg instanceof ToDeviceSessionActorMsg) {
processToDeviceMsg((ToDeviceSessionActorMsg) msg);
} else if (msg instanceof SessionTimeoutMsg) {
processTimeoutMsg((SessionTimeoutMsg) msg);
} else if (msg instanceof SessionCtrlMsg) {
processSessionCtrlMsg((SessionCtrlMsg) msg);
} else if (msg instanceof ClusterEventMsg) {
processClusterEvent((ClusterEventMsg) msg);
} else {
logger.warning("[{}] Unknown msg: {}", sessionId, msg);
switch (msg.getMsgType()) {
case TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG:
processTransportToSessionMsg((TransportToDeviceSessionActorMsg) msg);
break;
case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG:
processActorsToSessionMsg((ActorSystemToDeviceSessionActorMsg) msg);
break;
case SESSION_TIMEOUT_MSG:
processTimeoutMsg((SessionTimeoutMsg) msg);
break;
case SESSION_CTRL_MSG:
processSessionCloseMsg((SessionCtrlMsg) msg);
break;
case CLUSTER_EVENT_MSG:
processClusterEvent((ClusterEventMsg) msg);
break;
default: return false;
}
return true;
}
private void processClusterEvent(ClusterEventMsg msg) {
processor.processClusterEvent(context(), msg);
}
private void processDeviceMsg(ToDeviceActorSessionMsg msg) {
private void processTransportToSessionMsg(TransportToDeviceSessionActorMsg msg) {
initProcessor(msg);
processor.processToDeviceActorMsg(context(), msg);
}
private void processToDeviceMsg(ToDeviceSessionActorMsg msg) {
private void processActorsToSessionMsg(ActorSystemToDeviceSessionActorMsg msg) {
processor.processToDeviceMsg(context(), msg.getMsg());
}
@ -106,7 +104,7 @@ public class SessionActor extends ContextAwareActor {
}
}
private void processSessionCtrlMsg(SessionCtrlMsg msg) {
private void processSessionCloseMsg(SessionCtrlMsg msg) {
if (processor != null) {
processor.processSessionCtrlMsg(context(), msg);
} else if (msg instanceof SessionCloseMsg) {
@ -116,7 +114,7 @@ public class SessionActor extends ContextAwareActor {
}
}
private void initProcessor(ToDeviceActorSessionMsg msg) {
private void initProcessor(TransportToDeviceSessionActorMsg msg) {
if (processor == null) {
SessionMsg sessionMsg = (SessionMsg) msg.getSessionMsg();
if (sessionMsg.getSessionContext().getSessionType() == SessionType.SYNC) {

View File

@ -17,7 +17,6 @@ package org.thingsboard.server.actors.session;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import akka.actor.*;
import org.thingsboard.server.actors.ActorSystemContext;
@ -33,8 +32,9 @@ import akka.event.Logging;
import akka.event.LoggingAdapter;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.core.SessionCloseMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
public class SessionManagerActor extends ContextAwareActor {
@ -104,7 +104,7 @@ public class SessionManagerActor extends ContextAwareActor {
}
private void forwardToSessionActor(SessionAwareMsg msg) {
if (msg instanceof ToDeviceSessionActorMsg || msg instanceof SessionCloseMsg) {
if (msg instanceof ActorSystemToDeviceSessionActorMsg || msg instanceof SessionCloseMsg) {
String sessionIdStr = msg.getSessionId().toUidStr();
ActorRef sessionActor = sessionActors.get(sessionIdStr);
if (sessionActor != null) {

View File

@ -22,7 +22,7 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.session.*;
import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg;
import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.msg.session.ex.SessionException;
@ -41,7 +41,7 @@ class SyncMsgProcessor extends AbstractSessionActorMsgProcessor {
}
@Override
protected void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg) {
protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) {
updateSessionCtx(msg, SessionType.SYNC);
pendingMsg = toDeviceMsg(msg);
pendingResponse = true;

View File

@ -17,13 +17,20 @@ package org.thingsboard.server.actors.shared;
import lombok.Data;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import java.io.Serializable;
@Data
public class SessionTimeoutMsg implements Serializable {
public class SessionTimeoutMsg implements Serializable, TbActorMsg {
private static final long serialVersionUID = 1L;
private final SessionId sessionId;
@Override
public MsgType getMsgType() {
return MsgType.SESSION_TIMEOUT_MSG;
}
}

View File

@ -107,7 +107,7 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
@Override
public void onServerAdded(ServerInstance server) {
log.debug("On server added event: {}", server);
log.info("On server added event: {}", server);
addNode(server);
logCircle();
}
@ -119,7 +119,7 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
@Override
public void onServerRemoved(ServerInstance server) {
log.debug("On server removed event: {}", server);
log.info("On server removed event: {}", server);
removeNode(server);
logCircle();
}

View File

@ -22,16 +22,11 @@ import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.SerializationUtils;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;

View File

@ -19,15 +19,7 @@ import io.grpc.stub.StreamObserver;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import java.util.UUID;

View File

@ -25,7 +25,7 @@
</encoder>
</appender>
<logger name="org.thingsboard.server" level="TRACE" />
<logger name="org.thingsboard.server" level="INFO" />
<logger name="akka" level="INFO" />
<root level="INFO">

View File

@ -101,6 +101,6 @@ public enum MsgType {
/**
* Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue.
*/
RULE_ENGINE_QUEUE_PUT_ACK_MSG;
RULE_ENGINE_QUEUE_PUT_ACK_MSG, ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG, TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG, SESSION_TIMEOUT_MSG, SESSION_CTRL_MSG;
}

View File

@ -24,7 +24,7 @@ import java.io.Serializable;
/**
* @author Andrew Shvayka
*/
public interface ToDeviceSessionActorMsg extends SessionAwareMsg, Serializable, TbActorMsg {
public interface ActorSystemToDeviceSessionActorMsg extends SessionAwareMsg, Serializable, TbActorMsg {
ToDeviceMsg getMsg();
}

View File

@ -19,12 +19,12 @@ import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
public class BasicToDeviceSessionActorMsg implements ToDeviceSessionActorMsg {
public class BasicActorSystemToDeviceSessionActorMsg implements ActorSystemToDeviceSessionActorMsg {
private final ToDeviceMsg msg;
private final SessionId sessionId;
public BasicToDeviceSessionActorMsg(ToDeviceMsg msg, SessionId sessionId) {
public BasicActorSystemToDeviceSessionActorMsg(ToDeviceMsg msg, SessionId sessionId) {
super();
this.msg = msg;
this.sessionId = sessionId;
@ -47,6 +47,6 @@ public class BasicToDeviceSessionActorMsg implements ToDeviceSessionActorMsg {
@Override
public MsgType getMsgType() {
return null;
return MsgType.ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG;
}
}

View File

@ -24,7 +24,7 @@ import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.msg.session.SessionType;
import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg;
import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
import java.util.Optional;
@ -45,7 +45,7 @@ public class BasicDeviceToDeviceActorMsg implements DeviceToDeviceActorMsg {
this(null, other.getTenantId(), other.getCustomerId(), other.getDeviceId(), other.getSessionId(), other.getSessionType(), msg);
}
public BasicDeviceToDeviceActorMsg(ToDeviceActorSessionMsg msg, SessionType sessionType) {
public BasicDeviceToDeviceActorMsg(TransportToDeviceSessionActorMsg msg, SessionType sessionType) {
this(null, msg.getTenantId(), msg.getCustomerId(), msg.getDeviceId(), msg.getSessionId(), sessionType, msg.getSessionMsg().getMsg());
}

View File

@ -20,15 +20,16 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
public class BasicToDeviceActorSessionMsg implements ToDeviceActorSessionMsg {
public class BasicTransportToDeviceSessionActorMsg implements TransportToDeviceSessionActorMsg {
private final TenantId tenantId;
private final CustomerId customerId;
private final DeviceId deviceId;
private final AdaptorToSessionActorMsg msg;
public BasicToDeviceActorSessionMsg(Device device, AdaptorToSessionActorMsg msg) {
public BasicTransportToDeviceSessionActorMsg(Device device, AdaptorToSessionActorMsg msg) {
super();
this.tenantId = device.getTenantId();
this.customerId = device.getCustomerId();
@ -36,13 +37,6 @@ public class BasicToDeviceActorSessionMsg implements ToDeviceActorSessionMsg {
this.msg = msg;
}
public BasicToDeviceActorSessionMsg(ToDeviceActorSessionMsg deviceMsg) {
this.tenantId = deviceMsg.getTenantId();
this.customerId = deviceMsg.getCustomerId();
this.deviceId = deviceMsg.getDeviceId();
this.msg = deviceMsg.getSessionMsg();
}
@Override
public DeviceId getDeviceId() {
return deviceId;
@ -69,8 +63,12 @@ public class BasicToDeviceActorSessionMsg implements ToDeviceActorSessionMsg {
@Override
public String toString() {
return "BasicToDeviceActorSessionMsg [tenantId=" + tenantId + ", customerId=" + customerId + ", deviceId=" + deviceId + ", msg=" + msg
return "BasicTransportToDeviceSessionActorMsg [tenantId=" + tenantId + ", customerId=" + customerId + ", deviceId=" + deviceId + ", msg=" + msg
+ "]";
}
@Override
public MsgType getMsgType() {
return MsgType.TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG;
}
}

View File

@ -15,8 +15,9 @@
*/
package org.thingsboard.server.common.msg.session;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
public interface SessionCtrlMsg extends SessionAwareMsg {
public interface SessionCtrlMsg extends SessionAwareMsg, TbActorMsg {
}

View File

@ -15,12 +15,13 @@
*/
package org.thingsboard.server.common.msg.session;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.CustomerAwareMsg;
import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
public interface ToDeviceActorSessionMsg extends DeviceAwareMsg, CustomerAwareMsg, TenantAwareMsg, SessionAwareMsg {
public interface TransportToDeviceSessionActorMsg extends DeviceAwareMsg, CustomerAwareMsg, TenantAwareMsg, SessionAwareMsg, TbActorMsg {
AdaptorToSessionActorMsg getSessionMsg();

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.common.msg.session.ctrl;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
public class SessionCloseMsg implements SessionCtrlMsg {
@ -60,4 +61,8 @@ public class SessionCloseMsg implements SessionCtrlMsg {
return timeout;
}
@Override
public MsgType getMsgType() {
return MsgType.SESSION_CTRL_MSG;
}
}

View File

@ -59,7 +59,7 @@
<velocity.version>1.7</velocity.version>
<velocity-tools.version>2.0</velocity-tools.version>
<mail.version>1.4.3</mail.version>
<curator.version>2.11.0</curator.version>
<curator.version>4.0.1</curator.version>
<protobuf.version>3.0.2</protobuf.version>
<grpc.version>1.12.0</grpc.version>
<lombok.version>1.16.18</lombok.version>

View File

@ -38,8 +38,6 @@ import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
import org.thingsboard.server.transport.coap.session.CoapExchangeObserverProxy;
import org.thingsboard.server.transport.coap.session.CoapSessionCtx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ReflectionUtils;
@Slf4j
@ -186,7 +184,7 @@ public class CoapTransportResource extends CoapResource {
throw new IllegalArgumentException("Unsupported msg type: " + type);
}
log.trace("Processing msg: {}", msg);
processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg));
processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
} catch (AdaptorException e) {
log.debug("Failed to decode payload {}", e);
exchange.respond(ResponseCode.BAD_REQUEST, e.getMessage());

View File

@ -108,8 +108,8 @@ public class CoapServerTest {
@Override
public void process(SessionAwareMsg toActorMsg) {
if (toActorMsg instanceof ToDeviceActorSessionMsg) {
AdaptorToSessionActorMsg sessionMsg = ((ToDeviceActorSessionMsg) toActorMsg).getSessionMsg();
if (toActorMsg instanceof TransportToDeviceSessionActorMsg) {
AdaptorToSessionActorMsg sessionMsg = ((TransportToDeviceSessionActorMsg) toActorMsg).getSessionMsg();
try {
FromDeviceMsg deviceMsg = sessionMsg.getMsg();
ToDeviceMsg toDeviceMsg = null;

View File

@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
@ -219,7 +219,7 @@ public class DeviceApiController {
private void process(HttpSessionCtx ctx, FromDeviceMsg request) {
AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request);
processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg));
processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
}
private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {

View File

@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
import org.thingsboard.server.common.data.security.DeviceX509Credentials;
import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
@ -207,7 +207,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
}
if (msg != null) {
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
} else {
log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
ctx.close();
@ -227,11 +227,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
try {
if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) {
grantedQoSList.add(getMinSupportedQos(reqQoS));
@ -261,10 +261,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
try {
if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
} else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
} else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
deviceSessionCtx.setDisallowAttributeResponses();
}

View File

@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
@ -96,8 +96,8 @@ public class GatewaySessionCtx {
GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device);
devices.put(deviceName, ctx);
log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName);
processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
}
}
@ -136,7 +136,7 @@ public class GatewaySessionCtx {
JsonConverter.parseWithTs(request, element.getAsJsonObject());
}
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
}
} else {
@ -152,7 +152,7 @@ public class GatewaySessionCtx {
Integer requestId = jsonObj.get("id").getAsInt();
String data = jsonObj.get("data").toString();
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data))));
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
@ -174,7 +174,7 @@ public class GatewaySessionCtx {
JsonObject deviceData = deviceEntry.getValue().getAsJsonObject();
request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
}
} else {
@ -207,7 +207,7 @@ public class GatewaySessionCtx {
request = new BasicGetAttributesRequest(requestId, null, keys);
}
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
ack(msg);
} else {