diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index bf06a3a8ac..d10436075b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -67,7 +67,6 @@ import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.kafka.TbNodeIdProvider; import org.thingsboard.server.service.cluster.discovery.DiscoveryService; import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; -import org.thingsboard.server.service.cluster.rpc.ClusterRpcService; import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.encoding.DataDecodingEncodingService; import org.thingsboard.server.service.executors.ClusterRpcCallbackExecutorService; @@ -81,7 +80,7 @@ import org.thingsboard.server.service.script.JsInvokeService; import org.thingsboard.server.service.session.DeviceSessionCacheService; import org.thingsboard.server.service.state.DeviceStateService; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; -import org.thingsboard.server.service.transport.RuleEngineTransportService; +import org.thingsboard.server.service.transport.TbCoreToTransportService; import javax.annotation.Nullable; import java.io.IOException; @@ -122,10 +121,6 @@ public class ActorSystemContext { @Getter private ClusterRoutingService routingService; - @Autowired - @Getter - private ClusterRpcService rpcService; - @Autowired @Getter private DataDecodingEncodingService encodingService; @@ -241,7 +236,7 @@ public class ActorSystemContext { @Lazy @Autowired @Getter - private RuleEngineTransportService ruleEngineTransportService; + private TbCoreToTransportService tbCoreToTransportService; @Lazy @Autowired diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 2752b53090..6c0dde4044 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -234,24 +234,24 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { if (msg.hasSubscribeToRPC()) { processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToRPC()); } - if (msg.hasPostAttributes()) { - handlePostAttributesRequest(context, msg.getSessionInfo(), msg.getPostAttributes()); - reportDeviceActivity = true; - } - if (msg.hasPostTelemetry()) { - handlePostTelemetryRequest(context, msg.getSessionInfo(), msg.getPostTelemetry()); - reportDeviceActivity = true; - } +// if (msg.hasPostAttributes()) { +// handlePostAttributesRequest(context, msg.getSessionInfo(), msg.getPostAttributes()); +// reportDeviceActivity = true; +// } +// if (msg.hasPostTelemetry()) { +// handlePostTelemetryRequest(context, msg.getSessionInfo(), msg.getPostTelemetry()); +// reportDeviceActivity = true; +// } if (msg.hasGetAttributes()) { handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes()); } if (msg.hasToDeviceRPCCallResponse()) { processRpcResponses(context, msg.getSessionInfo(), msg.getToDeviceRPCCallResponse()); } - if (msg.hasToServerRPCCallRequest()) { - handleClientSideRPCRequest(context, msg.getSessionInfo(), msg.getToServerRPCCallRequest()); - reportDeviceActivity = true; - } +// if (msg.hasToServerRPCCallRequest()) { +// handleClientSideRPCRequest(context, msg.getSessionInfo(), msg.getToServerRPCCallRequest()); +// reportDeviceActivity = true; +// } if (msg.hasSubscriptionInfo()) { handleSessionActivity(context, msg.getSessionInfo(), msg.getSubscriptionInfo()); } @@ -260,6 +260,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } } + //TODO: 2.5 move this as a notification to the queue; private void reportLogicalDeviceActivity() { systemContext.getDeviceStateService().onDeviceActivity(deviceId); } @@ -540,7 +541,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { .setSessionIdMSB(sessionId.getMostSignificantBits()) .setSessionIdLSB(sessionId.getLeastSignificantBits()) .setSessionCloseNotification(SessionCloseNotificationProto.getDefaultInstance()).build(); - systemContext.getRuleEngineTransportService().process(sessionMd.getSessionInfo().getNodeId(), msg); + systemContext.getTbCoreToTransportService().process(sessionMd.getSessionInfo().getNodeId(), msg); } void processNameOrTypeUpdate(DeviceNameOrTypeUpdateMsg msg) { @@ -556,7 +557,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { .setSessionIdMSB(sessionInfo.getSessionIdMSB()) .setSessionIdLSB(sessionInfo.getSessionIdLSB()) .setGetAttributesResponse(responseMsg).build(); - systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg); + systemContext.getTbCoreToTransportService().process(sessionInfo.getNodeId(), msg); } private void sendToTransport(AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, String nodeId) { @@ -564,7 +565,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { .setSessionIdMSB(sessionId.getMostSignificantBits()) .setSessionIdLSB(sessionId.getLeastSignificantBits()) .setAttributeUpdateNotification(notificationMsg).build(); - systemContext.getRuleEngineTransportService().process(nodeId, msg); + systemContext.getTbCoreToTransportService().process(nodeId, msg); } private void sendToTransport(ToDeviceRpcRequestMsg rpcMsg, UUID sessionId, String nodeId) { @@ -572,7 +573,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { .setSessionIdMSB(sessionId.getMostSignificantBits()) .setSessionIdLSB(sessionId.getLeastSignificantBits()) .setToDeviceRequest(rpcMsg).build(); - systemContext.getRuleEngineTransportService().process(nodeId, msg); + systemContext.getTbCoreToTransportService().process(nodeId, msg); } private void sendToTransport(TransportProtos.ToServerRpcResponseMsg rpcMsg, UUID sessionId, String nodeId) { @@ -580,7 +581,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { .setSessionIdMSB(sessionId.getMostSignificantBits()) .setSessionIdLSB(sessionId.getLeastSignificantBits()) .setToServerResponse(rpcMsg).build(); - systemContext.getRuleEngineTransportService().process(nodeId, msg); + systemContext.getTbCoreToTransportService().process(nodeId, msg); } diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java index 43344ef374..e5da93b9e2 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java @@ -52,15 +52,12 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService; import org.thingsboard.server.service.cluster.discovery.ServerInstance; import org.thingsboard.server.service.cluster.rpc.ClusterRpcService; import org.thingsboard.server.service.state.DeviceStateService; -import org.thingsboard.server.service.transport.RuleEngineStats; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE; diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java deleted file mode 100644 index 23b050634c..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Copyright © 2016-2020 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.cluster.rpc; - -import com.google.protobuf.ByteString; -import io.grpc.Server; -import io.grpc.ServerBuilder; -import io.grpc.stub.StreamObserver; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import org.thingsboard.server.actors.rpc.RpcBroadcastMsg; -import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg; -import org.thingsboard.server.common.msg.TbActorMsg; -import org.thingsboard.server.common.msg.cluster.ServerAddress; -import org.thingsboard.server.gen.cluster.ClusterAPIProtos; -import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc; -import org.thingsboard.server.service.cluster.discovery.ServerInstance; -import org.thingsboard.server.service.cluster.discovery.ServerInstanceService; -import org.thingsboard.server.service.encoding.DataDecodingEncodingService; - -import javax.annotation.PreDestroy; -import java.io.IOException; -import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * @author Andrew Shvayka - */ -@Service -@Slf4j -public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceImplBase implements ClusterRpcService { - - @Autowired - private ServerInstanceService instanceService; - - @Autowired - private DataDecodingEncodingService encodingService; - - private RpcMsgListener listener; - - private Server server; - - private ServerInstance instance; - - private ConcurrentMap>> pendingSessionMap = - new ConcurrentHashMap<>(); - - public void init(RpcMsgListener listener) { - this.listener = listener; - log.info("Initializing RPC service!"); - instance = instanceService.getSelf(); - server = ServerBuilder.forPort(instance.getPort()).addService(this).build(); - log.info("Going to start RPC server using port: {}", instance.getPort()); - try { - server.start(); - } catch (IOException e) { - log.error("Failed to start RPC server!", e); - throw new RuntimeException("Failed to start RPC server!"); - } - log.info("RPC service initialized!"); - } - - @Override - public void onSessionCreated(UUID msgUid, StreamObserver inputStream) { - BlockingQueue> queue = pendingSessionMap.remove(msgUid); - if (queue != null) { - try { - queue.put(inputStream); - } catch (InterruptedException e) { - log.warn("Failed to report created session!"); - Thread.currentThread().interrupt(); - } - } else { - log.warn("Failed to lookup pending session!"); - } - } - - @Override - public StreamObserver handleMsgs( - StreamObserver responseObserver) { - log.info("Processing new session."); - return createSession(new RpcSessionCreateRequestMsg(UUID.randomUUID(), null, responseObserver)); - } - - - @PreDestroy - public void stop() { - if (server != null) { - log.info("Going to onStop RPC server"); - server.shutdownNow(); - try { - server.awaitTermination(); - log.info("RPC server stopped!"); - } catch (InterruptedException e) { - log.warn("Failed to onStop RPC server!"); - Thread.currentThread().interrupt(); - } - } - } - - - @Override - public void broadcast(RpcBroadcastMsg msg) { - listener.onBroadcastMsg(msg); - } - - private StreamObserver createSession(RpcSessionCreateRequestMsg msg) { - BlockingQueue> queue = new ArrayBlockingQueue<>(1); - pendingSessionMap.put(msg.getMsgUid(), queue); - listener.onRpcSessionCreateRequestMsg(msg); - try { - StreamObserver observer = queue.take(); - log.info("Processed new session."); - return observer; - } catch (Exception e) { - log.info("Failed to process session.", e); - throw new RuntimeException(e); - } - } - - @Override - public void tell(ClusterAPIProtos.ClusterMessage message) { - listener.onSendMsg(message); - } - - @Override - public void tell(ServerAddress serverAddress, TbActorMsg actorMsg) { - listener.onSendMsg(encodingService.convertToProtoDataMessage(serverAddress, actorMsg)); - } - - @Override - public void tell(ServerAddress serverAddress, ClusterAPIProtos.MessageType msgType, byte[] data) { - ClusterAPIProtos.ClusterMessage msg = ClusterAPIProtos.ClusterMessage - .newBuilder() - .setServerAddress(ClusterAPIProtos.ServerAddress - .newBuilder() - .setHost(serverAddress.getHost()) - .setPort(serverAddress.getPort()) - .build()) - .setMessageType(msgType) - .setPayload(ByteString.copyFrom(data)).build(); - listener.onSendMsg(msg); - } -} diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java deleted file mode 100644 index 637924fca9..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Copyright © 2016-2020 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.cluster.rpc; - -import io.grpc.stub.StreamObserver; -import org.thingsboard.server.actors.rpc.RpcBroadcastMsg; -import org.thingsboard.server.common.msg.TbActorMsg; -import org.thingsboard.server.common.msg.cluster.ServerAddress; -import org.thingsboard.server.gen.cluster.ClusterAPIProtos; - -import java.util.UUID; - -/** - * @author Andrew Shvayka - */ -public interface ClusterRpcService { - - void init(RpcMsgListener listener); - - void broadcast(RpcBroadcastMsg msg); - - void onSessionCreated(UUID msgUid, StreamObserver inputStream); - - void tell(ClusterAPIProtos.ClusterMessage message); - - void tell(ServerAddress serverAddress, TbActorMsg actorMsg); - - void tell(ServerAddress serverAddress, ClusterAPIProtos.MessageType msgType, byte[] data); -} diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java deleted file mode 100644 index aff0bc4a41..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Copyright © 2016-2020 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.cluster.rpc; - -import io.grpc.Channel; -import io.grpc.ManagedChannel; -import io.grpc.stub.StreamObserver; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.msg.cluster.ServerAddress; -import org.thingsboard.server.common.msg.cluster.ServerType; -import org.thingsboard.server.gen.cluster.ClusterAPIProtos; - -import java.io.Closeable; -import java.util.UUID; - -/** - * @author Andrew Shvayka - */ -@Data -@Slf4j -public final class GrpcSession implements Closeable { - private final UUID sessionId; - private final boolean client; - private final GrpcSessionListener listener; - private final ManagedChannel channel; - private StreamObserver inputStream; - private StreamObserver outputStream; - - private boolean connected; - private ServerAddress remoteServer; - - public GrpcSession(GrpcSessionListener listener) { - this(null, listener, null); - } - - public GrpcSession(ServerAddress remoteServer, GrpcSessionListener listener, ManagedChannel channel) { - this.sessionId = UUID.randomUUID(); - this.listener = listener; - if (remoteServer != null) { - this.client = true; - this.connected = true; - this.remoteServer = remoteServer; - } else { - this.client = false; - } - this.channel = channel; - } - - public void initInputStream() { - this.inputStream = new StreamObserver() { - @Override - public void onNext(ClusterAPIProtos.ClusterMessage clusterMessage) { - if (!connected && clusterMessage.getMessageType() == ClusterAPIProtos.MessageType.CONNECT_RPC_MESSAGE) { - connected = true; - ServerAddress rpcAddress = new ServerAddress(clusterMessage.getServerAddress().getHost(), clusterMessage.getServerAddress().getPort(), ServerType.CORE); - remoteServer = new ServerAddress(rpcAddress.getHost(), rpcAddress.getPort(), ServerType.CORE); - listener.onConnected(GrpcSession.this); - } - if (connected) { - listener.onReceiveClusterGrpcMsg(GrpcSession.this, clusterMessage); - } - } - - @Override - public void onError(Throwable t) { - listener.onError(GrpcSession.this, t); - } - - @Override - public void onCompleted() { - outputStream.onCompleted(); - listener.onDisconnected(GrpcSession.this); - } - }; - } - - public void initOutputStream() { - if (client) { - listener.onConnected(GrpcSession.this); - } - } - - public void sendMsg(ClusterAPIProtos.ClusterMessage msg) { - if (connected) { - try { - outputStream.onNext(msg); - } catch (Throwable t) { - try { - outputStream.onError(t); - } catch (Throwable t2) { - } - listener.onError(GrpcSession.this, t); - } - } else { - log.warn("[{}] Failed to send message due to closed session!", sessionId); - } - } - - @Override - public void close() { - connected = false; - try { - outputStream.onCompleted(); - } catch (IllegalStateException e) { - log.debug("[{}] Failed to close output stream: {}", sessionId, e.getMessage()); - } - if (channel != null) { - channel.shutdownNow(); - } - } -} diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java deleted file mode 100644 index a6ecf967d6..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Copyright © 2016-2020 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.cluster.rpc; - -import org.thingsboard.server.gen.cluster.ClusterAPIProtos; - -/** - * @author Andrew Shvayka - */ -public interface GrpcSessionListener { - - void onConnected(GrpcSession session); - - void onDisconnected(GrpcSession session); - - void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage); - - void onError(GrpcSession session, Throwable t); -} diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java deleted file mode 100644 index 2ff37b39e0..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Copyright © 2016-2020 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.cluster.rpc; - -import org.thingsboard.server.actors.rpc.RpcBroadcastMsg; -import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg; -import org.thingsboard.server.common.msg.cluster.ServerAddress; -import org.thingsboard.server.gen.cluster.ClusterAPIProtos; - -/** - * @author Andrew Shvayka - */ - -public interface RpcMsgListener { - void onReceivedMsg(ServerAddress remoteServer, ClusterAPIProtos.ClusterMessage msg); - void onSendMsg(ClusterAPIProtos.ClusterMessage msg); - void onRpcSessionCreateRequestMsg(RpcSessionCreateRequestMsg msg); - void onBroadcastMsg(RpcBroadcastMsg msg); -} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index d919857318..4cd5c664fd 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -15,7 +15,6 @@ import org.thingsboard.server.common.TbProtoQueueMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.provider.TbCoreQueueProvider; -import org.thingsboard.server.service.transport.RuleEngineStats; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; import javax.annotation.PostConstruct; @@ -44,7 +43,7 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService { private final ActorSystemContext actorContext; private final TbQueueConsumer> consumer; - private final RuleEngineStats stats = new RuleEngineStats(); + private final TbCoreConsumerStats stats = new TbCoreConsumerStats(); private volatile ExecutorService mainConsumerExecutor; private volatile boolean stopped = false; @@ -71,10 +70,10 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService { ackMap.forEach((id, msg) -> { TbMsgCallback callback = new MsgPackCallback<>(id, processingTimeoutLatch, ackMap); try { - ToCoreMsg toRuleEngineMsg = msg.getValue(); - log.trace("Forwarding message to rule engine {}", toRuleEngineMsg); - if (toRuleEngineMsg.hasToDeviceActorMsg()) { - forwardToDeviceActor(toRuleEngineMsg.getToDeviceActorMsg(), callback); + ToCoreMsg toCoreMsg = msg.getValue(); + log.trace("Forwarding message to rule engine {}", toCoreMsg); + if (toCoreMsg.hasToDeviceActorMsg()) { + forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback); } else { callback.onSuccess(); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java new file mode 100644 index 0000000000..96b72843ed --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -0,0 +1,124 @@ +package org.thingsboard.server.service.queue; + +import akka.actor.ActorRef; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.TbQueueConsumer; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.common.TbProtoQueueMsg; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.provider.TbCoreQueueProvider; +import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +@Service +@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-rule-engine')") +@Slf4j +public class DefaultTbRuleEngineConsumerService implements TbRuleEngineConsumerService { + + + @Value("${queue.rule-engine.poll_interval}") + private long pollDuration; + @Value("${queue.rule-engine.pack_processing_timeout}") + private long packProcessingTimeout; + @Value("${queue.rule-engine.stats.enabled:false}") + private boolean statsEnabled; + + private final ActorSystemContext actorContext; + private final TbQueueConsumer> consumer; + private final TbCoreConsumerStats stats = new TbCoreConsumerStats(); + private volatile ExecutorService mainConsumerExecutor; + private volatile boolean stopped = false; + + public DefaultTbRuleEngineConsumerService(TbCoreQueueProvider tbCoreQueueProvider, ActorSystemContext actorContext) { + this.consumer = tbCoreQueueProvider.getToCoreMsgConsumer(); + this.actorContext = actorContext; + } + + @PostConstruct + public void init() { + this.consumer.subscribe(); + this.mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-consumer")); + } + + @EventListener(ApplicationReadyEvent.class) + public void onApplicationEvent(ApplicationReadyEvent event) { + mainConsumerExecutor.execute(() -> { + while (!stopped) { + try { + List> msgs = consumer.poll(pollDuration); + ConcurrentMap> ackMap = msgs.stream().collect( + Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity())); + CountDownLatch processingTimeoutLatch = new CountDownLatch(1); + ackMap.forEach((id, msg) -> { + TbMsgCallback callback = new MsgPackCallback<>(id, processingTimeoutLatch, ackMap); + try { + TransportProtos.ToCoreMsg toCoreMsg = msg.getValue(); + log.trace("Forwarding message to rule engine {}", toCoreMsg); + if (toCoreMsg.hasToDeviceActorMsg()) { + forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback); + } else { + callback.onSuccess(); + } + } catch (Throwable e) { + callback.onFailure(e); + } + }); + if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) { + ackMap.forEach((id, msg) -> log.warn("[{}] Timeout to process message: {}", id, msg.getValue())); + } + consumer.commit(); + } catch (Exception e) { + log.warn("Failed to obtain messages from queue.", e); + try { + Thread.sleep(pollDuration); + } catch (InterruptedException e2) { + log.trace("Failed to wait until the server has capacity to handle new requests", e2); + } + } + } + }); + } + + private void forwardToDeviceActor(TransportProtos.TransportToDeviceActorMsg toDeviceActorMsg, TbMsgCallback callback) { + if (statsEnabled) { + stats.log(toDeviceActorMsg); + } + actorContext.getAppActor().tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback), ActorRef.noSender()); + } + + @Scheduled(fixedDelayString = "${queue.core.stats.print_interval_ms}") + public void printStats() { + if (statsEnabled) { + stats.printStats(); + } + } + + @PreDestroy + public void destroy() { + stopped = true; + if (consumer != null) { + consumer.unsubscribe(); + } + if (mainConsumerExecutor != null) { + mainConsumerExecutor.shutdownNow(); + } + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java new file mode 100644 index 0000000000..e3bb3a3c4b --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java @@ -0,0 +1,70 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.gen.transport.TransportProtos; + +import java.util.concurrent.atomic.AtomicInteger; + +@Slf4j +public class TbCoreConsumerStats { + + private final AtomicInteger totalCounter = new AtomicInteger(0); + private final AtomicInteger sessionEventCounter = new AtomicInteger(0); + private final AtomicInteger getAttributesCounter = new AtomicInteger(0); + private final AtomicInteger subscribeToAttributesCounter = new AtomicInteger(0); + private final AtomicInteger subscribeToRPCCounter = new AtomicInteger(0); + private final AtomicInteger toDeviceRPCCallResponseCounter = new AtomicInteger(0); + private final AtomicInteger subscriptionInfoCounter = new AtomicInteger(0); + private final AtomicInteger claimDeviceCounter = new AtomicInteger(0); + + public void log(TransportProtos.TransportToDeviceActorMsg msg) { + totalCounter.incrementAndGet(); + if (msg.hasSessionEvent()) { + sessionEventCounter.incrementAndGet(); + } + if (msg.hasGetAttributes()) { + getAttributesCounter.incrementAndGet(); + } + if (msg.hasSubscribeToAttributes()) { + subscribeToAttributesCounter.incrementAndGet(); + } + if (msg.hasSubscribeToRPC()) { + subscribeToRPCCounter.incrementAndGet(); + } + if (msg.hasToDeviceRPCCallResponse()) { + toDeviceRPCCallResponseCounter.incrementAndGet(); + } + if (msg.hasSubscriptionInfo()) { + subscriptionInfoCounter.incrementAndGet(); + } + if (msg.hasClaimDevice()) { + claimDeviceCounter.incrementAndGet(); + } + } + + public void printStats() { + int total = totalCounter.getAndSet(0); + if (total > 0) { + log.info("Transport total [{}] sessionEvents [{}] getAttr [{}] subToAttr [{}] subToRpc [{}] toDevRpc [{}] subInfo [{}] claimDevice [{}]", + total, sessionEventCounter.getAndSet(0), + getAttributesCounter.getAndSet(0), subscribeToAttributesCounter.getAndSet(0), + subscribeToRPCCounter.getAndSet(0), toDeviceRPCCallResponseCounter.getAndSet(0), + subscriptionInfoCounter.getAndSet(0), claimDeviceCounter.getAndSet(0)); + } + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerService.java new file mode 100644 index 0000000000..691a54aaab --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerService.java @@ -0,0 +1,5 @@ +package org.thingsboard.server.service.queue; + +public interface TbRuleEngineConsumerService { + +} diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java similarity index 97% rename from application/src/main/java/org/thingsboard/server/service/transport/RuleEngineStats.java rename to application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java index dea6e6d33c..80d05628f2 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.transport; +package org.thingsboard.server.service.queue; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.gen.transport.TransportProtos; @@ -21,7 +21,7 @@ import org.thingsboard.server.gen.transport.TransportProtos; import java.util.concurrent.atomic.AtomicInteger; @Slf4j -public class RuleEngineStats { +public class TbRuleEngineConsumerStats { private final AtomicInteger totalCounter = new AtomicInteger(0); private final AtomicInteger sessionEventCounter = new AtomicInteger(0); diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java index 83353cea4b..980f5e6964 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java @@ -45,6 +45,7 @@ import org.thingsboard.server.kafka.TbNodeIdProvider; import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; import org.thingsboard.server.service.cluster.rpc.ClusterRpcService; import org.thingsboard.server.service.encoding.DataDecodingEncodingService; +import org.thingsboard.server.service.queue.TbCoreConsumerStats; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; import javax.annotation.PostConstruct; @@ -109,7 +110,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ private volatile boolean stopped = false; - private final RuleEngineStats stats = new RuleEngineStats(); + private final TbCoreConsumerStats stats = new TbCoreConsumerStats(); @PostConstruct public void init() { diff --git a/common/queue/src/main/proto/transport.proto b/common/queue/src/main/proto/transport.proto index 807aa571bf..892139a84a 100644 --- a/common/queue/src/main/proto/transport.proto +++ b/common/queue/src/main/proto/transport.proto @@ -23,7 +23,7 @@ option java_outer_classname = "TransportProtos"; * Transport Service Data Structures; */ message SessionInfoProto { - string ServiceId = 1; + string nodeId = 1; int64 sessionIdMSB = 2; int64 sessionIdLSB = 3; int64 tenantIdMSB = 4;