From 4255bb56a38d0edf1c671ce07321a6bf431fd7f2 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Tue, 30 Oct 2018 11:41:03 +0200 Subject: [PATCH] Implementation of Session Cache, TTL and few minor fixes --- .../device/DeviceActorMessageProcessor.java | 25 +++- .../actors/rpc/BasicRpcSessionListener.java | 2 +- .../server/actors/rpc/RpcManagerActor.java | 6 +- .../server/actors/rpc/RpcSessionActor.java | 5 +- .../server/actors/tenant/TenantActor.java | 2 +- .../service/cluster/rpc/GrpcSession.java | 11 +- .../AnnotationComponentDiscoveryService.java | 34 +++-- .../transport/LocalTransportService.java | 52 +++---- .../server/kafka/TBKafkaProducerTemplate.java | 7 +- .../service/AbstractTransportService.java | 60 ++++++-- .../service/RemoteTransportService.java | 130 ++++++++---------- docker/tb-node/conf/logback.xml | 2 +- 12 files changed, 188 insertions(+), 148 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 08a4454809..a16bd78a25 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -201,7 +201,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { private Consumer> processPendingRpc(ActorContext context, UUID sessionId, String nodeId, Set sentOneWayIds) { return entry -> { - ToDeviceRpcRequestActorMsg requestActorMsg = entry.getValue().getMsg(); ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg(); ToDeviceRpcRequestBody body = request.getBody(); if (request.isOneway()) { @@ -486,6 +485,12 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime()); sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription()); sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription()); + if (subscriptionInfo.getAttributeSubscription()) { + attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo()); + } + if (subscriptionInfo.getRpcSubscription()) { + rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo()); + } } dumpSessions(); } @@ -618,8 +623,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } private void restoreSessions() { + logger.debug("[{}] Restoring sessions from cache", deviceId); TransportProtos.DeviceSessionsCacheEntry sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId); if (sessionsDump.getSerializedSize() == 0) { + logger.debug("[{}] No session information found", deviceId); return; } for (TransportProtos.SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) { @@ -627,18 +634,23 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { UUID sessionId = getSessionId(sessionInfoProto); SessionInfo sessionInfo = new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId()); TransportProtos.SubscriptionInfoProto subInfo = sessionSubscriptionInfoProto.getSubscriptionInfo(); - SessionInfoMetaData sessionInfoMetaData = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime()); - sessions.put(sessionId, sessionInfoMetaData); - if (subInfo.getAttributeSubscription()) { - rpcSubscriptions.put(sessionId, sessionInfo); - } + SessionInfoMetaData sessionMD = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime()); + sessions.put(sessionId, sessionMD); if (subInfo.getAttributeSubscription()) { attributeSubscriptions.put(sessionId, sessionInfo); + sessionMD.setSubscribedToAttributes(true); } + if (subInfo.getRpcSubscription()) { + rpcSubscriptions.put(sessionId, sessionInfo); + sessionMD.setSubscribedToRPC(true); + } + logger.debug("[{}] Restored session: {}", deviceId, sessionMD); } + logger.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size()); } private void dumpSessions() { + logger.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size()); List sessionsList = new ArrayList<>(sessions.size()); sessions.forEach((uuid, sessionMD) -> { if (sessionMD.getSessionInfo().getType() == TransportProtos.SessionType.SYNC) { @@ -656,6 +668,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { sessionsList.add(TransportProtos.SessionSubscriptionInfoProto.newBuilder() .setSessionInfo(sessionInfoProto) .setSubscriptionInfo(subscriptionInfoProto).build()); + logger.debug("[{}] Dumping session: {}", deviceId, sessionMD); }); systemContext.getDeviceSessionCacheService() .put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder() diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java index 14bb636830..1eba066960 100644 --- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java +++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java @@ -32,7 +32,7 @@ public class BasicRpcSessionListener implements GrpcSessionListener { private final ActorRef manager; private final ActorRef self; - public BasicRpcSessionListener(ActorService service, ActorRef manager, ActorRef self) { + BasicRpcSessionListener(ActorService service, ActorRef manager, ActorRef self) { this.service = service; this.manager = manager; this.self = self; diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java index 66174e1e4b..31320ce5d2 100644 --- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java @@ -103,10 +103,10 @@ public class RpcManagerActor extends ContextAwareActor { ServerAddress address = new ServerAddress(msg.getServerAddress().getHost(), msg.getServerAddress().getPort(), ServerType.CORE); SessionActorInfo session = sessionActors.get(address); if (session != null) { - log.debug("{} Forwarding msg to session actor", address); + log.debug("{} Forwarding msg to session actor: {}", address, msg); session.getActor().tell(msg, ActorRef.noSender()); } else { - log.debug("{} Storing msg to pending queue", address); + log.debug("{} Storing msg to pending queue: {}", address, msg); Queue queue = pendingMsgs.get(address); if (queue == null) { queue = new LinkedList<>(); @@ -116,7 +116,7 @@ public class RpcManagerActor extends ContextAwareActor { queue.add(msg); } } else { - logger.warning("Cluster msg doesn't have set Server Address [{}]", msg); + logger.warning("Cluster msg doesn't have server address [{}]", msg); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java index c9cf8696e5..86509ca08e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java @@ -18,6 +18,7 @@ package org.thingsboard.server.actors.rpc; import akka.event.Logging; import akka.event.LoggingAdapter; import io.grpc.Channel; +import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import org.thingsboard.server.actors.ActorSystemContext; @@ -88,8 +89,8 @@ public class RpcSessionActor extends ContextAwareActor { systemContext.getRpcService().onSessionCreated(msg.getMsgUid(), session.getInputStream()); } else { // Client session - Channel channel = ManagedChannelBuilder.forAddress(remoteServer.getHost(), remoteServer.getPort()).usePlaintext(true).build(); - session = new GrpcSession(remoteServer, listener); + ManagedChannel channel = ManagedChannelBuilder.forAddress(remoteServer.getHost(), remoteServer.getPort()).usePlaintext().build(); + session = new GrpcSession(remoteServer, listener, channel); session.initInputStream(); ClusterRpcServiceGrpc.ClusterRpcServiceStub stub = ClusterRpcServiceGrpc.newStub(channel); diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index f822e4e1f0..721d828fd6 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -108,7 +108,7 @@ public class TenantActor extends RuleChainManagerActor { @Override protected void broadcast(Object msg) { super.broadcast(msg); - deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender())); +// deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender())); } private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) { diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java index 55cf52c37b..dd0a995c84 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.service.cluster.rpc; +import io.grpc.Channel; +import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -34,6 +36,7 @@ public final class GrpcSession implements Closeable { private final UUID sessionId; private final boolean client; private final GrpcSessionListener listener; + private final ManagedChannel channel; private StreamObserver inputStream; private StreamObserver outputStream; @@ -41,10 +44,10 @@ public final class GrpcSession implements Closeable { private ServerAddress remoteServer; public GrpcSession(GrpcSessionListener listener) { - this(null, listener); + this(null, listener, null); } - public GrpcSession(ServerAddress remoteServer, GrpcSessionListener listener) { + public GrpcSession(ServerAddress remoteServer, GrpcSessionListener listener, ManagedChannel channel) { this.sessionId = UUID.randomUUID(); this.listener = listener; if (remoteServer != null) { @@ -54,6 +57,7 @@ public final class GrpcSession implements Closeable { } else { this.client = false; } + this.channel = channel; } public void initInputStream() { @@ -105,5 +109,8 @@ public final class GrpcSession implements Closeable { } catch (IllegalStateException e) { log.debug("[{}] Failed to close output stream: {}", sessionId, e.getMessage()); } + if (channel != null) { + channel.shutdownNow(); + } } } diff --git a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java index f3ceed2069..5a1e55988d 100644 --- a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java +++ b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java @@ -30,7 +30,6 @@ import org.thingsboard.rule.engine.api.NodeConfiguration; import org.thingsboard.rule.engine.api.NodeDefinition; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbRelationTypes; -import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.plugin.ComponentDescriptor; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.dao.component.ComponentDescriptorService; @@ -52,6 +51,7 @@ import java.util.Set; @Slf4j public class AnnotationComponentDiscoveryService implements ComponentDiscoveryService { + public static final int MAX_OPTIMISITC_RETRIES = 3; @Value("${plugins.scan_packages}") private String[] scanPackages; @@ -81,17 +81,27 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe private void registerRuleNodeComponents() { Set ruleNodeBeanDefinitions = getBeanDefinitions(RuleNode.class); for (BeanDefinition def : ruleNodeBeanDefinitions) { - try { - String clazzName = def.getBeanClassName(); - Class clazz = Class.forName(clazzName); - RuleNode ruleNodeAnnotation = clazz.getAnnotation(RuleNode.class); - ComponentType type = ruleNodeAnnotation.type(); - ComponentDescriptor component = scanAndPersistComponent(def, type); - components.put(component.getClazz(), component); - componentsMap.computeIfAbsent(type, k -> new ArrayList<>()).add(component); - } catch (Exception e) { - log.error("Can't initialize component {}, due to {}", def.getBeanClassName(), e.getMessage(), e); - throw new RuntimeException(e); + int retryCount = 0; + Exception cause = null; + while (retryCount < MAX_OPTIMISITC_RETRIES) { + try { + String clazzName = def.getBeanClassName(); + Class clazz = Class.forName(clazzName); + RuleNode ruleNodeAnnotation = clazz.getAnnotation(RuleNode.class); + ComponentType type = ruleNodeAnnotation.type(); + ComponentDescriptor component = scanAndPersistComponent(def, type); + components.put(component.getClazz(), component); + componentsMap.computeIfAbsent(type, k -> new ArrayList<>()).add(component); + break; + } catch (Exception e) { + log.trace("Can't initialize component {}, due to {}", def.getBeanClassName(), e.getMessage(), e); + cause = e; + retryCount++; + } + } + if (cause != null && retryCount == MAX_OPTIMISITC_RETRIES) { + log.error("Can't initialize component {}, due to {}", def.getBeanClassName(), cause.getMessage(), cause); + throw new RuntimeException(cause); } } } diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java index d3782e8ffc..3d12938903 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java @@ -133,66 +133,48 @@ public class LocalTransportService extends AbstractTransportService implements R } @Override - public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback); - } + protected void doProcess(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback callback) { + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback); } @Override - public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback); - } + protected void doProcess(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback callback) { + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback); } @Override - public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback); - } + protected void doProcess(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback callback) { + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback); } @Override - public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback); - } + protected void doProcess(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback callback) { + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback); } @Override public void process(SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscriptionInfo(msg).build(), callback); - } + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscriptionInfo(msg).build(), callback); } @Override - public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback); - } + protected void doProcess(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback callback) { + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback); } @Override - public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback); - } + protected void doProcess(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback callback) { + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback); } @Override - public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback); - } + protected void doProcess(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback callback) { + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback); } @Override - public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback); - } + protected void doProcess(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback callback) { + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java index 610a49007d..bd42f310a6 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java @@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.header.Header; import java.util.List; @@ -75,7 +76,11 @@ public class TBKafkaProducerTemplate { CreateTopicsResult result = admin.createTopic(new NewTopic(defaultTopic, 100, (short) 1)); result.all().get(); } catch (Exception e) { - log.trace("Failed to create topic: {}", e.getMessage(), e); + if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) { + log.trace("[{}] Topic already exists: ", defaultTopic); + } else { + log.trace("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e); + } } //Maybe this should not be cached, but we don't plan to change size of partitions this.partitionInfoMap = new ConcurrentHashMap<>(); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java index 3bb84ecbc4..fad19541b4 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java @@ -68,44 +68,68 @@ public abstract class AbstractTransportService implements TransportService { @Override public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback callback) { - reportActivityInternal(sessionInfo); + if (checkLimits(sessionInfo, callback)) { + reportActivityInternal(sessionInfo); + doProcess(sessionInfo, msg, callback); + } } @Override public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback callback) { - reportActivityInternal(sessionInfo); + if (checkLimits(sessionInfo, callback)) { + reportActivityInternal(sessionInfo); + doProcess(sessionInfo, msg, callback); + } } @Override public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback callback) { - reportActivityInternal(sessionInfo); + if (checkLimits(sessionInfo, callback)) { + reportActivityInternal(sessionInfo); + doProcess(sessionInfo, msg, callback); + } } @Override public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback callback) { - reportActivityInternal(sessionInfo); + if (checkLimits(sessionInfo, callback)) { + reportActivityInternal(sessionInfo); + doProcess(sessionInfo, msg, callback); + } } @Override public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback callback) { - SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); - sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe()); + if (checkLimits(sessionInfo, callback)) { + SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); + sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe()); + doProcess(sessionInfo, msg, callback); + } } @Override public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback callback) { - SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); - sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe()); + if (checkLimits(sessionInfo, callback)) { + SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); + sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe()); + doProcess(sessionInfo, msg, callback); + } } @Override public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback callback) { - reportActivityInternal(sessionInfo); + if (checkLimits(sessionInfo, callback)) { + reportActivityInternal(sessionInfo); + doProcess(sessionInfo, msg, callback); + } } @Override public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback callback) { - reportActivityInternal(sessionInfo); + if (checkLimits(sessionInfo, callback)) { + reportActivityInternal(sessionInfo); + doProcess(sessionInfo, msg, callback); + } } @Override @@ -113,6 +137,22 @@ public abstract class AbstractTransportService implements TransportService { reportActivityInternal(sessionInfo); } + protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback callback); + + protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback callback); + + protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback callback); + + protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback callback); + + protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback callback); + + protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback callback); + + protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback callback); + + protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback callback); + private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) { UUID sessionId = toId(sessionInfo); SessionMetaData sessionMetaData = sessions.get(sessionId); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java index 091096106b..4b11bf58c9 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java @@ -216,103 +216,85 @@ public class RemoteTransportService extends AbstractTransportService { response -> callback.onSuccess(response.getGetOrCreateDeviceResponseMsg()), callback::onError, transportCallbackExecutor); } - @Override - public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setSessionEvent(msg).build() - ).build(); - send(sessionInfo, toRuleEngineMsg, callback); - } - } - @Override public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setSubscriptionInfo(msg).build() - ).build(); - send(sessionInfo, toRuleEngineMsg, callback); - } + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setSubscriptionInfo(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); } @Override - public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setPostTelemetry(msg).build() - ).build(); - send(sessionInfo, toRuleEngineMsg, callback); - } + protected void doProcess(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback callback) { + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setSessionEvent(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); } @Override - public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setPostAttributes(msg).build() - ).build(); - send(sessionInfo, toRuleEngineMsg, callback); - } + protected void doProcess(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback callback) { + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setPostTelemetry(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); } @Override - public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setGetAttributes(msg).build() - ).build(); - send(sessionInfo, toRuleEngineMsg, callback); - } + protected void doProcess(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback callback) { + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setPostAttributes(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); } @Override - public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setSubscribeToAttributes(msg).build() - ).build(); - send(sessionInfo, toRuleEngineMsg, callback); - } + protected void doProcess(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback callback) { + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setGetAttributes(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); } @Override - public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setSubscribeToRPC(msg).build() - ).build(); - send(sessionInfo, toRuleEngineMsg, callback); - } + protected void doProcess(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback callback) { + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setSubscribeToAttributes(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); } @Override - public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setToDeviceRPCCallResponse(msg).build() - ).build(); - send(sessionInfo, toRuleEngineMsg, callback); - } + protected void doProcess(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback callback) { + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setSubscribeToRPC(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); } @Override - public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback callback) { - if (checkLimits(sessionInfo, callback)) { - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setToServerRPCCallRequest(msg).build() - ).build(); - send(sessionInfo, toRuleEngineMsg, callback); - } + protected void doProcess(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback callback) { + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setToDeviceRPCCallResponse(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); + } + + @Override + protected void doProcess(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback callback) { + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setToServerRPCCallRequest(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); } private static class TransportCallbackAdaptor implements Callback { diff --git a/docker/tb-node/conf/logback.xml b/docker/tb-node/conf/logback.xml index 1c69f537bc..8bf38eb287 100644 --- a/docker/tb-node/conf/logback.xml +++ b/docker/tb-node/conf/logback.xml @@ -40,7 +40,7 @@ - +