Transport Implementation

This commit is contained in:
Andrii Shvaika 2020-03-12 14:37:38 +02:00
parent fa9194c1c1
commit c4c53bfbd8
15 changed files with 229 additions and 429 deletions

View File

@ -67,7 +67,6 @@ import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.kafka.TbNodeIdProvider;
import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
import org.thingsboard.server.service.executors.ClusterRpcCallbackExecutorService;
@ -81,7 +80,7 @@ import org.thingsboard.server.service.script.JsInvokeService;
import org.thingsboard.server.service.session.DeviceSessionCacheService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import org.thingsboard.server.service.transport.RuleEngineTransportService;
import org.thingsboard.server.service.transport.TbCoreToTransportService;
import javax.annotation.Nullable;
import java.io.IOException;
@ -122,10 +121,6 @@ public class ActorSystemContext {
@Getter
private ClusterRoutingService routingService;
@Autowired
@Getter
private ClusterRpcService rpcService;
@Autowired
@Getter
private DataDecodingEncodingService encodingService;
@ -241,7 +236,7 @@ public class ActorSystemContext {
@Lazy
@Autowired
@Getter
private RuleEngineTransportService ruleEngineTransportService;
private TbCoreToTransportService tbCoreToTransportService;
@Lazy
@Autowired

View File

@ -234,24 +234,24 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
if (msg.hasSubscribeToRPC()) {
processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToRPC());
}
if (msg.hasPostAttributes()) {
handlePostAttributesRequest(context, msg.getSessionInfo(), msg.getPostAttributes());
reportDeviceActivity = true;
}
if (msg.hasPostTelemetry()) {
handlePostTelemetryRequest(context, msg.getSessionInfo(), msg.getPostTelemetry());
reportDeviceActivity = true;
}
// if (msg.hasPostAttributes()) {
// handlePostAttributesRequest(context, msg.getSessionInfo(), msg.getPostAttributes());
// reportDeviceActivity = true;
// }
// if (msg.hasPostTelemetry()) {
// handlePostTelemetryRequest(context, msg.getSessionInfo(), msg.getPostTelemetry());
// reportDeviceActivity = true;
// }
if (msg.hasGetAttributes()) {
handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes());
}
if (msg.hasToDeviceRPCCallResponse()) {
processRpcResponses(context, msg.getSessionInfo(), msg.getToDeviceRPCCallResponse());
}
if (msg.hasToServerRPCCallRequest()) {
handleClientSideRPCRequest(context, msg.getSessionInfo(), msg.getToServerRPCCallRequest());
reportDeviceActivity = true;
}
// if (msg.hasToServerRPCCallRequest()) {
// handleClientSideRPCRequest(context, msg.getSessionInfo(), msg.getToServerRPCCallRequest());
// reportDeviceActivity = true;
// }
if (msg.hasSubscriptionInfo()) {
handleSessionActivity(context, msg.getSessionInfo(), msg.getSubscriptionInfo());
}
@ -260,6 +260,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
}
}
//TODO: 2.5 move this as a notification to the queue;
private void reportLogicalDeviceActivity() {
systemContext.getDeviceStateService().onDeviceActivity(deviceId);
}
@ -540,7 +541,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setSessionCloseNotification(SessionCloseNotificationProto.getDefaultInstance()).build();
systemContext.getRuleEngineTransportService().process(sessionMd.getSessionInfo().getNodeId(), msg);
systemContext.getTbCoreToTransportService().process(sessionMd.getSessionInfo().getNodeId(), msg);
}
void processNameOrTypeUpdate(DeviceNameOrTypeUpdateMsg msg) {
@ -556,7 +557,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
.setSessionIdMSB(sessionInfo.getSessionIdMSB())
.setSessionIdLSB(sessionInfo.getSessionIdLSB())
.setGetAttributesResponse(responseMsg).build();
systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
systemContext.getTbCoreToTransportService().process(sessionInfo.getNodeId(), msg);
}
private void sendToTransport(AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, String nodeId) {
@ -564,7 +565,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setAttributeUpdateNotification(notificationMsg).build();
systemContext.getRuleEngineTransportService().process(nodeId, msg);
systemContext.getTbCoreToTransportService().process(nodeId, msg);
}
private void sendToTransport(ToDeviceRpcRequestMsg rpcMsg, UUID sessionId, String nodeId) {
@ -572,7 +573,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setToDeviceRequest(rpcMsg).build();
systemContext.getRuleEngineTransportService().process(nodeId, msg);
systemContext.getTbCoreToTransportService().process(nodeId, msg);
}
private void sendToTransport(TransportProtos.ToServerRpcResponseMsg rpcMsg, UUID sessionId, String nodeId) {
@ -580,7 +581,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setToServerResponse(rpcMsg).build();
systemContext.getRuleEngineTransportService().process(nodeId, msg);
systemContext.getTbCoreToTransportService().process(nodeId, msg);
}

View File

@ -52,15 +52,12 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
import org.thingsboard.server.service.cluster.discovery.ServerInstance;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.transport.RuleEngineStats;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE;

View File

@ -1,161 +0,0 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cluster.rpc;
import com.google.protobuf.ByteString;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;
import org.thingsboard.server.service.cluster.discovery.ServerInstance;
import org.thingsboard.server.service.cluster.discovery.ServerInstanceService;
import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @author Andrew Shvayka
*/
@Service
@Slf4j
public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceImplBase implements ClusterRpcService {
@Autowired
private ServerInstanceService instanceService;
@Autowired
private DataDecodingEncodingService encodingService;
private RpcMsgListener listener;
private Server server;
private ServerInstance instance;
private ConcurrentMap<UUID, BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>>> pendingSessionMap =
new ConcurrentHashMap<>();
public void init(RpcMsgListener listener) {
this.listener = listener;
log.info("Initializing RPC service!");
instance = instanceService.getSelf();
server = ServerBuilder.forPort(instance.getPort()).addService(this).build();
log.info("Going to start RPC server using port: {}", instance.getPort());
try {
server.start();
} catch (IOException e) {
log.error("Failed to start RPC server!", e);
throw new RuntimeException("Failed to start RPC server!");
}
log.info("RPC service initialized!");
}
@Override
public void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream) {
BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>> queue = pendingSessionMap.remove(msgUid);
if (queue != null) {
try {
queue.put(inputStream);
} catch (InterruptedException e) {
log.warn("Failed to report created session!");
Thread.currentThread().interrupt();
}
} else {
log.warn("Failed to lookup pending session!");
}
}
@Override
public StreamObserver<ClusterAPIProtos.ClusterMessage> handleMsgs(
StreamObserver<ClusterAPIProtos.ClusterMessage> responseObserver) {
log.info("Processing new session.");
return createSession(new RpcSessionCreateRequestMsg(UUID.randomUUID(), null, responseObserver));
}
@PreDestroy
public void stop() {
if (server != null) {
log.info("Going to onStop RPC server");
server.shutdownNow();
try {
server.awaitTermination();
log.info("RPC server stopped!");
} catch (InterruptedException e) {
log.warn("Failed to onStop RPC server!");
Thread.currentThread().interrupt();
}
}
}
@Override
public void broadcast(RpcBroadcastMsg msg) {
listener.onBroadcastMsg(msg);
}
private StreamObserver<ClusterAPIProtos.ClusterMessage> createSession(RpcSessionCreateRequestMsg msg) {
BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>> queue = new ArrayBlockingQueue<>(1);
pendingSessionMap.put(msg.getMsgUid(), queue);
listener.onRpcSessionCreateRequestMsg(msg);
try {
StreamObserver<ClusterAPIProtos.ClusterMessage> observer = queue.take();
log.info("Processed new session.");
return observer;
} catch (Exception e) {
log.info("Failed to process session.", e);
throw new RuntimeException(e);
}
}
@Override
public void tell(ClusterAPIProtos.ClusterMessage message) {
listener.onSendMsg(message);
}
@Override
public void tell(ServerAddress serverAddress, TbActorMsg actorMsg) {
listener.onSendMsg(encodingService.convertToProtoDataMessage(serverAddress, actorMsg));
}
@Override
public void tell(ServerAddress serverAddress, ClusterAPIProtos.MessageType msgType, byte[] data) {
ClusterAPIProtos.ClusterMessage msg = ClusterAPIProtos.ClusterMessage
.newBuilder()
.setServerAddress(ClusterAPIProtos.ServerAddress
.newBuilder()
.setHost(serverAddress.getHost())
.setPort(serverAddress.getPort())
.build())
.setMessageType(msgType)
.setPayload(ByteString.copyFrom(data)).build();
listener.onSendMsg(msg);
}
}

View File

@ -1,42 +0,0 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cluster.rpc;
import io.grpc.stub.StreamObserver;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import java.util.UUID;
/**
* @author Andrew Shvayka
*/
public interface ClusterRpcService {
void init(RpcMsgListener listener);
void broadcast(RpcBroadcastMsg msg);
void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream);
void tell(ClusterAPIProtos.ClusterMessage message);
void tell(ServerAddress serverAddress, TbActorMsg actorMsg);
void tell(ServerAddress serverAddress, ClusterAPIProtos.MessageType msgType, byte[] data);
}

View File

@ -1,125 +0,0 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cluster.rpc;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ServerType;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import java.io.Closeable;
import java.util.UUID;
/**
* @author Andrew Shvayka
*/
@Data
@Slf4j
public final class GrpcSession implements Closeable {
private final UUID sessionId;
private final boolean client;
private final GrpcSessionListener listener;
private final ManagedChannel channel;
private StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream;
private StreamObserver<ClusterAPIProtos.ClusterMessage> outputStream;
private boolean connected;
private ServerAddress remoteServer;
public GrpcSession(GrpcSessionListener listener) {
this(null, listener, null);
}
public GrpcSession(ServerAddress remoteServer, GrpcSessionListener listener, ManagedChannel channel) {
this.sessionId = UUID.randomUUID();
this.listener = listener;
if (remoteServer != null) {
this.client = true;
this.connected = true;
this.remoteServer = remoteServer;
} else {
this.client = false;
}
this.channel = channel;
}
public void initInputStream() {
this.inputStream = new StreamObserver<ClusterAPIProtos.ClusterMessage>() {
@Override
public void onNext(ClusterAPIProtos.ClusterMessage clusterMessage) {
if (!connected && clusterMessage.getMessageType() == ClusterAPIProtos.MessageType.CONNECT_RPC_MESSAGE) {
connected = true;
ServerAddress rpcAddress = new ServerAddress(clusterMessage.getServerAddress().getHost(), clusterMessage.getServerAddress().getPort(), ServerType.CORE);
remoteServer = new ServerAddress(rpcAddress.getHost(), rpcAddress.getPort(), ServerType.CORE);
listener.onConnected(GrpcSession.this);
}
if (connected) {
listener.onReceiveClusterGrpcMsg(GrpcSession.this, clusterMessage);
}
}
@Override
public void onError(Throwable t) {
listener.onError(GrpcSession.this, t);
}
@Override
public void onCompleted() {
outputStream.onCompleted();
listener.onDisconnected(GrpcSession.this);
}
};
}
public void initOutputStream() {
if (client) {
listener.onConnected(GrpcSession.this);
}
}
public void sendMsg(ClusterAPIProtos.ClusterMessage msg) {
if (connected) {
try {
outputStream.onNext(msg);
} catch (Throwable t) {
try {
outputStream.onError(t);
} catch (Throwable t2) {
}
listener.onError(GrpcSession.this, t);
}
} else {
log.warn("[{}] Failed to send message due to closed session!", sessionId);
}
}
@Override
public void close() {
connected = false;
try {
outputStream.onCompleted();
} catch (IllegalStateException e) {
log.debug("[{}] Failed to close output stream: {}", sessionId, e.getMessage());
}
if (channel != null) {
channel.shutdownNow();
}
}
}

View File

@ -1,32 +0,0 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cluster.rpc;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
/**
* @author Andrew Shvayka
*/
public interface GrpcSessionListener {
void onConnected(GrpcSession session);
void onDisconnected(GrpcSession session);
void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage);
void onError(GrpcSession session, Throwable t);
}

View File

@ -1,32 +0,0 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cluster.rpc;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
/**
* @author Andrew Shvayka
*/
public interface RpcMsgListener {
void onReceivedMsg(ServerAddress remoteServer, ClusterAPIProtos.ClusterMessage msg);
void onSendMsg(ClusterAPIProtos.ClusterMessage msg);
void onRpcSessionCreateRequestMsg(RpcSessionCreateRequestMsg msg);
void onBroadcastMsg(RpcBroadcastMsg msg);
}

View File

@ -15,7 +15,6 @@ import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.provider.TbCoreQueueProvider;
import org.thingsboard.server.service.transport.RuleEngineStats;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
import javax.annotation.PostConstruct;
@ -44,7 +43,7 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService {
private final ActorSystemContext actorContext;
private final TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> consumer;
private final RuleEngineStats stats = new RuleEngineStats();
private final TbCoreConsumerStats stats = new TbCoreConsumerStats();
private volatile ExecutorService mainConsumerExecutor;
private volatile boolean stopped = false;
@ -71,10 +70,10 @@ public class DefaultTbCoreConsumerService implements TbCoreConsumerService {
ackMap.forEach((id, msg) -> {
TbMsgCallback callback = new MsgPackCallback<>(id, processingTimeoutLatch, ackMap);
try {
ToCoreMsg toRuleEngineMsg = msg.getValue();
log.trace("Forwarding message to rule engine {}", toRuleEngineMsg);
if (toRuleEngineMsg.hasToDeviceActorMsg()) {
forwardToDeviceActor(toRuleEngineMsg.getToDeviceActorMsg(), callback);
ToCoreMsg toCoreMsg = msg.getValue();
log.trace("Forwarding message to rule engine {}", toCoreMsg);
if (toCoreMsg.hasToDeviceActorMsg()) {
forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
} else {
callback.onSuccess();
}

View File

@ -0,0 +1,124 @@
package org.thingsboard.server.service.queue;
import akka.actor.ActorRef;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.TbQueueConsumer;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.provider.TbCoreQueueProvider;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@Service
@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-rule-engine')")
@Slf4j
public class DefaultTbRuleEngineConsumerService implements TbRuleEngineConsumerService {
@Value("${queue.rule-engine.poll_interval}")
private long pollDuration;
@Value("${queue.rule-engine.pack_processing_timeout}")
private long packProcessingTimeout;
@Value("${queue.rule-engine.stats.enabled:false}")
private boolean statsEnabled;
private final ActorSystemContext actorContext;
private final TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> consumer;
private final TbCoreConsumerStats stats = new TbCoreConsumerStats();
private volatile ExecutorService mainConsumerExecutor;
private volatile boolean stopped = false;
public DefaultTbRuleEngineConsumerService(TbCoreQueueProvider tbCoreQueueProvider, ActorSystemContext actorContext) {
this.consumer = tbCoreQueueProvider.getToCoreMsgConsumer();
this.actorContext = actorContext;
}
@PostConstruct
public void init() {
this.consumer.subscribe();
this.mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-consumer"));
}
@EventListener(ApplicationReadyEvent.class)
public void onApplicationEvent(ApplicationReadyEvent event) {
mainConsumerExecutor.execute(() -> {
while (!stopped) {
try {
List<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> msgs = consumer.poll(pollDuration);
ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToCoreMsg>> ackMap = msgs.stream().collect(
Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity()));
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
ackMap.forEach((id, msg) -> {
TbMsgCallback callback = new MsgPackCallback<>(id, processingTimeoutLatch, ackMap);
try {
TransportProtos.ToCoreMsg toCoreMsg = msg.getValue();
log.trace("Forwarding message to rule engine {}", toCoreMsg);
if (toCoreMsg.hasToDeviceActorMsg()) {
forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
} else {
callback.onSuccess();
}
} catch (Throwable e) {
callback.onFailure(e);
}
});
if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
ackMap.forEach((id, msg) -> log.warn("[{}] Timeout to process message: {}", id, msg.getValue()));
}
consumer.commit();
} catch (Exception e) {
log.warn("Failed to obtain messages from queue.", e);
try {
Thread.sleep(pollDuration);
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new requests", e2);
}
}
}
});
}
private void forwardToDeviceActor(TransportProtos.TransportToDeviceActorMsg toDeviceActorMsg, TbMsgCallback callback) {
if (statsEnabled) {
stats.log(toDeviceActorMsg);
}
actorContext.getAppActor().tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback), ActorRef.noSender());
}
@Scheduled(fixedDelayString = "${queue.core.stats.print_interval_ms}")
public void printStats() {
if (statsEnabled) {
stats.printStats();
}
}
@PreDestroy
public void destroy() {
stopped = true;
if (consumer != null) {
consumer.unsubscribe();
}
if (mainConsumerExecutor != null) {
mainConsumerExecutor.shutdownNow();
}
}
}

View File

@ -0,0 +1,70 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.queue;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class TbCoreConsumerStats {
private final AtomicInteger totalCounter = new AtomicInteger(0);
private final AtomicInteger sessionEventCounter = new AtomicInteger(0);
private final AtomicInteger getAttributesCounter = new AtomicInteger(0);
private final AtomicInteger subscribeToAttributesCounter = new AtomicInteger(0);
private final AtomicInteger subscribeToRPCCounter = new AtomicInteger(0);
private final AtomicInteger toDeviceRPCCallResponseCounter = new AtomicInteger(0);
private final AtomicInteger subscriptionInfoCounter = new AtomicInteger(0);
private final AtomicInteger claimDeviceCounter = new AtomicInteger(0);
public void log(TransportProtos.TransportToDeviceActorMsg msg) {
totalCounter.incrementAndGet();
if (msg.hasSessionEvent()) {
sessionEventCounter.incrementAndGet();
}
if (msg.hasGetAttributes()) {
getAttributesCounter.incrementAndGet();
}
if (msg.hasSubscribeToAttributes()) {
subscribeToAttributesCounter.incrementAndGet();
}
if (msg.hasSubscribeToRPC()) {
subscribeToRPCCounter.incrementAndGet();
}
if (msg.hasToDeviceRPCCallResponse()) {
toDeviceRPCCallResponseCounter.incrementAndGet();
}
if (msg.hasSubscriptionInfo()) {
subscriptionInfoCounter.incrementAndGet();
}
if (msg.hasClaimDevice()) {
claimDeviceCounter.incrementAndGet();
}
}
public void printStats() {
int total = totalCounter.getAndSet(0);
if (total > 0) {
log.info("Transport total [{}] sessionEvents [{}] getAttr [{}] subToAttr [{}] subToRpc [{}] toDevRpc [{}] subInfo [{}] claimDevice [{}]",
total, sessionEventCounter.getAndSet(0),
getAttributesCounter.getAndSet(0), subscribeToAttributesCounter.getAndSet(0),
subscribeToRPCCounter.getAndSet(0), toDeviceRPCCallResponseCounter.getAndSet(0),
subscriptionInfoCounter.getAndSet(0), claimDeviceCounter.getAndSet(0));
}
}
}

View File

@ -0,0 +1,5 @@
package org.thingsboard.server.service.queue;
public interface TbRuleEngineConsumerService {
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.transport;
package org.thingsboard.server.service.queue;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -21,7 +21,7 @@ import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class RuleEngineStats {
public class TbRuleEngineConsumerStats {
private final AtomicInteger totalCounter = new AtomicInteger(0);
private final AtomicInteger sessionEventCounter = new AtomicInteger(0);

View File

@ -45,6 +45,7 @@ import org.thingsboard.server.kafka.TbNodeIdProvider;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
import org.thingsboard.server.service.queue.TbCoreConsumerStats;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
import javax.annotation.PostConstruct;
@ -109,7 +110,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
private volatile boolean stopped = false;
private final RuleEngineStats stats = new RuleEngineStats();
private final TbCoreConsumerStats stats = new TbCoreConsumerStats();
@PostConstruct
public void init() {

View File

@ -23,7 +23,7 @@ option java_outer_classname = "TransportProtos";
* Transport Service Data Structures;
*/
message SessionInfoProto {
string ServiceId = 1;
string nodeId = 1;
int64 sessionIdMSB = 2;
int64 sessionIdLSB = 3;
int64 tenantIdMSB = 4;