Implementation of Session Cache, TTL and few minor fixes
This commit is contained in:
parent
9a5e4db825
commit
4255bb56a3
@ -201,7 +201,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
|
||||
private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) {
|
||||
return entry -> {
|
||||
ToDeviceRpcRequestActorMsg requestActorMsg = entry.getValue().getMsg();
|
||||
ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
|
||||
ToDeviceRpcRequestBody body = request.getBody();
|
||||
if (request.isOneway()) {
|
||||
@ -486,6 +485,12 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
|
||||
sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
|
||||
sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
|
||||
if (subscriptionInfo.getAttributeSubscription()) {
|
||||
attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
|
||||
}
|
||||
if (subscriptionInfo.getRpcSubscription()) {
|
||||
rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
|
||||
}
|
||||
}
|
||||
dumpSessions();
|
||||
}
|
||||
@ -618,8 +623,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
}
|
||||
|
||||
private void restoreSessions() {
|
||||
logger.debug("[{}] Restoring sessions from cache", deviceId);
|
||||
TransportProtos.DeviceSessionsCacheEntry sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId);
|
||||
if (sessionsDump.getSerializedSize() == 0) {
|
||||
logger.debug("[{}] No session information found", deviceId);
|
||||
return;
|
||||
}
|
||||
for (TransportProtos.SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) {
|
||||
@ -627,18 +634,23 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
UUID sessionId = getSessionId(sessionInfoProto);
|
||||
SessionInfo sessionInfo = new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId());
|
||||
TransportProtos.SubscriptionInfoProto subInfo = sessionSubscriptionInfoProto.getSubscriptionInfo();
|
||||
SessionInfoMetaData sessionInfoMetaData = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime());
|
||||
sessions.put(sessionId, sessionInfoMetaData);
|
||||
if (subInfo.getAttributeSubscription()) {
|
||||
rpcSubscriptions.put(sessionId, sessionInfo);
|
||||
}
|
||||
SessionInfoMetaData sessionMD = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime());
|
||||
sessions.put(sessionId, sessionMD);
|
||||
if (subInfo.getAttributeSubscription()) {
|
||||
attributeSubscriptions.put(sessionId, sessionInfo);
|
||||
sessionMD.setSubscribedToAttributes(true);
|
||||
}
|
||||
if (subInfo.getRpcSubscription()) {
|
||||
rpcSubscriptions.put(sessionId, sessionInfo);
|
||||
sessionMD.setSubscribedToRPC(true);
|
||||
}
|
||||
logger.debug("[{}] Restored session: {}", deviceId, sessionMD);
|
||||
}
|
||||
logger.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
|
||||
}
|
||||
|
||||
private void dumpSessions() {
|
||||
logger.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
|
||||
List<TransportProtos.SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
|
||||
sessions.forEach((uuid, sessionMD) -> {
|
||||
if (sessionMD.getSessionInfo().getType() == TransportProtos.SessionType.SYNC) {
|
||||
@ -656,6 +668,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
sessionsList.add(TransportProtos.SessionSubscriptionInfoProto.newBuilder()
|
||||
.setSessionInfo(sessionInfoProto)
|
||||
.setSubscriptionInfo(subscriptionInfoProto).build());
|
||||
logger.debug("[{}] Dumping session: {}", deviceId, sessionMD);
|
||||
});
|
||||
systemContext.getDeviceSessionCacheService()
|
||||
.put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder()
|
||||
|
||||
@ -32,7 +32,7 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
|
||||
private final ActorRef manager;
|
||||
private final ActorRef self;
|
||||
|
||||
public BasicRpcSessionListener(ActorService service, ActorRef manager, ActorRef self) {
|
||||
BasicRpcSessionListener(ActorService service, ActorRef manager, ActorRef self) {
|
||||
this.service = service;
|
||||
this.manager = manager;
|
||||
this.self = self;
|
||||
|
||||
@ -103,10 +103,10 @@ public class RpcManagerActor extends ContextAwareActor {
|
||||
ServerAddress address = new ServerAddress(msg.getServerAddress().getHost(), msg.getServerAddress().getPort(), ServerType.CORE);
|
||||
SessionActorInfo session = sessionActors.get(address);
|
||||
if (session != null) {
|
||||
log.debug("{} Forwarding msg to session actor", address);
|
||||
log.debug("{} Forwarding msg to session actor: {}", address, msg);
|
||||
session.getActor().tell(msg, ActorRef.noSender());
|
||||
} else {
|
||||
log.debug("{} Storing msg to pending queue", address);
|
||||
log.debug("{} Storing msg to pending queue: {}", address, msg);
|
||||
Queue<ClusterAPIProtos.ClusterMessage> queue = pendingMsgs.get(address);
|
||||
if (queue == null) {
|
||||
queue = new LinkedList<>();
|
||||
@ -116,7 +116,7 @@ public class RpcManagerActor extends ContextAwareActor {
|
||||
queue.add(msg);
|
||||
}
|
||||
} else {
|
||||
logger.warning("Cluster msg doesn't have set Server Address [{}]", msg);
|
||||
logger.warning("Cluster msg doesn't have server address [{}]", msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -18,6 +18,7 @@ package org.thingsboard.server.actors.rpc;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
@ -88,8 +89,8 @@ public class RpcSessionActor extends ContextAwareActor {
|
||||
systemContext.getRpcService().onSessionCreated(msg.getMsgUid(), session.getInputStream());
|
||||
} else {
|
||||
// Client session
|
||||
Channel channel = ManagedChannelBuilder.forAddress(remoteServer.getHost(), remoteServer.getPort()).usePlaintext(true).build();
|
||||
session = new GrpcSession(remoteServer, listener);
|
||||
ManagedChannel channel = ManagedChannelBuilder.forAddress(remoteServer.getHost(), remoteServer.getPort()).usePlaintext().build();
|
||||
session = new GrpcSession(remoteServer, listener, channel);
|
||||
session.initInputStream();
|
||||
|
||||
ClusterRpcServiceGrpc.ClusterRpcServiceStub stub = ClusterRpcServiceGrpc.newStub(channel);
|
||||
|
||||
@ -108,7 +108,7 @@ public class TenantActor extends RuleChainManagerActor {
|
||||
@Override
|
||||
protected void broadcast(Object msg) {
|
||||
super.broadcast(msg);
|
||||
deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
|
||||
// deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
|
||||
}
|
||||
|
||||
private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
|
||||
|
||||
@ -15,6 +15,8 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.cluster.rpc;
|
||||
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -34,6 +36,7 @@ public final class GrpcSession implements Closeable {
|
||||
private final UUID sessionId;
|
||||
private final boolean client;
|
||||
private final GrpcSessionListener listener;
|
||||
private final ManagedChannel channel;
|
||||
private StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream;
|
||||
private StreamObserver<ClusterAPIProtos.ClusterMessage> outputStream;
|
||||
|
||||
@ -41,10 +44,10 @@ public final class GrpcSession implements Closeable {
|
||||
private ServerAddress remoteServer;
|
||||
|
||||
public GrpcSession(GrpcSessionListener listener) {
|
||||
this(null, listener);
|
||||
this(null, listener, null);
|
||||
}
|
||||
|
||||
public GrpcSession(ServerAddress remoteServer, GrpcSessionListener listener) {
|
||||
public GrpcSession(ServerAddress remoteServer, GrpcSessionListener listener, ManagedChannel channel) {
|
||||
this.sessionId = UUID.randomUUID();
|
||||
this.listener = listener;
|
||||
if (remoteServer != null) {
|
||||
@ -54,6 +57,7 @@ public final class GrpcSession implements Closeable {
|
||||
} else {
|
||||
this.client = false;
|
||||
}
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
public void initInputStream() {
|
||||
@ -105,5 +109,8 @@ public final class GrpcSession implements Closeable {
|
||||
} catch (IllegalStateException e) {
|
||||
log.debug("[{}] Failed to close output stream: {}", sessionId, e.getMessage());
|
||||
}
|
||||
if (channel != null) {
|
||||
channel.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -30,7 +30,6 @@ import org.thingsboard.rule.engine.api.NodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.NodeDefinition;
|
||||
import org.thingsboard.rule.engine.api.RuleNode;
|
||||
import org.thingsboard.rule.engine.api.TbRelationTypes;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentDescriptor;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||
import org.thingsboard.server.dao.component.ComponentDescriptorService;
|
||||
@ -52,6 +51,7 @@ import java.util.Set;
|
||||
@Slf4j
|
||||
public class AnnotationComponentDiscoveryService implements ComponentDiscoveryService {
|
||||
|
||||
public static final int MAX_OPTIMISITC_RETRIES = 3;
|
||||
@Value("${plugins.scan_packages}")
|
||||
private String[] scanPackages;
|
||||
|
||||
@ -81,17 +81,27 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
|
||||
private void registerRuleNodeComponents() {
|
||||
Set<BeanDefinition> ruleNodeBeanDefinitions = getBeanDefinitions(RuleNode.class);
|
||||
for (BeanDefinition def : ruleNodeBeanDefinitions) {
|
||||
try {
|
||||
String clazzName = def.getBeanClassName();
|
||||
Class<?> clazz = Class.forName(clazzName);
|
||||
RuleNode ruleNodeAnnotation = clazz.getAnnotation(RuleNode.class);
|
||||
ComponentType type = ruleNodeAnnotation.type();
|
||||
ComponentDescriptor component = scanAndPersistComponent(def, type);
|
||||
components.put(component.getClazz(), component);
|
||||
componentsMap.computeIfAbsent(type, k -> new ArrayList<>()).add(component);
|
||||
} catch (Exception e) {
|
||||
log.error("Can't initialize component {}, due to {}", def.getBeanClassName(), e.getMessage(), e);
|
||||
throw new RuntimeException(e);
|
||||
int retryCount = 0;
|
||||
Exception cause = null;
|
||||
while (retryCount < MAX_OPTIMISITC_RETRIES) {
|
||||
try {
|
||||
String clazzName = def.getBeanClassName();
|
||||
Class<?> clazz = Class.forName(clazzName);
|
||||
RuleNode ruleNodeAnnotation = clazz.getAnnotation(RuleNode.class);
|
||||
ComponentType type = ruleNodeAnnotation.type();
|
||||
ComponentDescriptor component = scanAndPersistComponent(def, type);
|
||||
components.put(component.getClazz(), component);
|
||||
componentsMap.computeIfAbsent(type, k -> new ArrayList<>()).add(component);
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
log.trace("Can't initialize component {}, due to {}", def.getBeanClassName(), e.getMessage(), e);
|
||||
cause = e;
|
||||
retryCount++;
|
||||
}
|
||||
}
|
||||
if (cause != null && retryCount == MAX_OPTIMISITC_RETRIES) {
|
||||
log.error("Can't initialize component {}, due to {}", def.getBeanClassName(), cause.getMessage(), cause);
|
||||
throw new RuntimeException(cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -133,66 +133,48 @@ public class LocalTransportService extends AbstractTransportService implements R
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);
|
||||
}
|
||||
protected void doProcess(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback);
|
||||
}
|
||||
protected void doProcess(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback);
|
||||
}
|
||||
protected void doProcess(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback);
|
||||
}
|
||||
protected void doProcess(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscriptionInfo(msg).build(), callback);
|
||||
}
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscriptionInfo(msg).build(), callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
|
||||
}
|
||||
protected void doProcess(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback);
|
||||
}
|
||||
protected void doProcess(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback);
|
||||
}
|
||||
protected void doProcess(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
|
||||
}
|
||||
protected void doProcess(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
|
||||
forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.errors.TopicExistsException;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
|
||||
import java.util.List;
|
||||
@ -75,7 +76,11 @@ public class TBKafkaProducerTemplate<T> {
|
||||
CreateTopicsResult result = admin.createTopic(new NewTopic(defaultTopic, 100, (short) 1));
|
||||
result.all().get();
|
||||
} catch (Exception e) {
|
||||
log.trace("Failed to create topic: {}", e.getMessage(), e);
|
||||
if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) {
|
||||
log.trace("[{}] Topic already exists: ", defaultTopic);
|
||||
} else {
|
||||
log.trace("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
//Maybe this should not be cached, but we don't plan to change size of partitions
|
||||
this.partitionInfoMap = new ConcurrentHashMap<>();
|
||||
|
||||
@ -68,44 +68,68 @@ public abstract class AbstractTransportService implements TransportService {
|
||||
|
||||
@Override
|
||||
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) {
|
||||
reportActivityInternal(sessionInfo);
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
reportActivityInternal(sessionInfo);
|
||||
doProcess(sessionInfo, msg, callback);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
|
||||
reportActivityInternal(sessionInfo);
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
reportActivityInternal(sessionInfo);
|
||||
doProcess(sessionInfo, msg, callback);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
|
||||
reportActivityInternal(sessionInfo);
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
reportActivityInternal(sessionInfo);
|
||||
doProcess(sessionInfo, msg, callback);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
|
||||
reportActivityInternal(sessionInfo);
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
reportActivityInternal(sessionInfo);
|
||||
doProcess(sessionInfo, msg, callback);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
|
||||
SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
|
||||
sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
|
||||
sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
|
||||
doProcess(sessionInfo, msg, callback);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
|
||||
SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
|
||||
sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
|
||||
sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
|
||||
doProcess(sessionInfo, msg, callback);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
|
||||
reportActivityInternal(sessionInfo);
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
reportActivityInternal(sessionInfo);
|
||||
doProcess(sessionInfo, msg, callback);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
|
||||
reportActivityInternal(sessionInfo);
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
reportActivityInternal(sessionInfo);
|
||||
doProcess(sessionInfo, msg, callback);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -113,6 +137,22 @@ public abstract class AbstractTransportService implements TransportService {
|
||||
reportActivityInternal(sessionInfo);
|
||||
}
|
||||
|
||||
protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback);
|
||||
|
||||
protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
|
||||
|
||||
protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback);
|
||||
|
||||
protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback);
|
||||
|
||||
protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback);
|
||||
|
||||
protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback);
|
||||
|
||||
protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback);
|
||||
|
||||
protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
|
||||
|
||||
private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
|
||||
UUID sessionId = toId(sessionInfo);
|
||||
SessionMetaData sessionMetaData = sessions.get(sessionId);
|
||||
|
||||
@ -216,103 +216,85 @@ public class RemoteTransportService extends AbstractTransportService {
|
||||
response -> callback.onSuccess(response.getGetOrCreateDeviceResponseMsg()), callback::onError, transportCallbackExecutor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setSessionEvent(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setSubscriptionInfo(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setSubscriptionInfo(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setPostTelemetry(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
protected void doProcess(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setSessionEvent(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setPostAttributes(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
protected void doProcess(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setPostTelemetry(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setGetAttributes(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
protected void doProcess(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setPostAttributes(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setSubscribeToAttributes(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
protected void doProcess(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setGetAttributes(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setSubscribeToRPC(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
protected void doProcess(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setSubscribeToAttributes(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setToDeviceRPCCallResponse(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
protected void doProcess(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setSubscribeToRPC(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, callback)) {
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setToServerRPCCallRequest(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
protected void doProcess(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setToDeviceRPCCallResponse(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doProcess(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
|
||||
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
|
||||
TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setToServerRPCCallRequest(msg).build()
|
||||
).build();
|
||||
send(sessionInfo, toRuleEngineMsg, callback);
|
||||
}
|
||||
|
||||
private static class TransportCallbackAdaptor implements Callback {
|
||||
|
||||
@ -40,7 +40,7 @@
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<logger name="org.thingsboard.server" level="INFO" />
|
||||
<logger name="org.thingsboard.server" level="TRACE" />
|
||||
<logger name="akka" level="INFO" />
|
||||
|
||||
<root level="INFO">
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user