Refactoring of Transport Interfaces

This commit is contained in:
Andrii Shvaika 2020-03-10 14:03:06 +02:00
parent a39e8c3756
commit 7b2bdeab23
11 changed files with 234 additions and 183 deletions

View File

@ -94,6 +94,10 @@
<artifactId>mockito-all</artifactId> <artifactId>mockito-all</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -4,6 +4,6 @@ import com.google.common.util.concurrent.ListenableFuture;
public interface TbQueueRequestTemplate<Request extends TbQueueMsg, Response extends TbQueueMsg> { public interface TbQueueRequestTemplate<Request extends TbQueueMsg, Response extends TbQueueMsg> {
ListenableFuture<Response> send(String key, Request request); ListenableFuture<Response> send(Request request);
} }

View File

@ -0,0 +1,21 @@
package org.thingsboard.server.common;
import org.thingsboard.server.TbQueueMsgHeaders;
import java.util.HashMap;
import java.util.Map;
public class DefaultTbQueueMsgHeaders implements TbQueueMsgHeaders {
private final Map<String, byte[]> data = new HashMap<>();
@Override
public byte[] put(String key, byte[] value) {
return data.put(key, value);
}
@Override
public byte[] get(String key) {
return data.get(key);
}
}

View File

@ -133,7 +133,7 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
} }
@Override @Override
public ListenableFuture<Response> send(String key, Request request) { public ListenableFuture<Response> send(Request request) {
if (tickSize > maxPendingRequests) { if (tickSize > maxPendingRequests) {
return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!")); return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
} }
@ -143,7 +143,7 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
SettableFuture<Response> future = SettableFuture.create(); SettableFuture<Response> future = SettableFuture.create();
ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future); ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
pendingRequests.putIfAbsent(requestId, responseMetaData); pendingRequests.putIfAbsent(requestId, responseMetaData);
log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, key, responseMetaData.expTime); log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, request.getKey(), responseMetaData.expTime);
requestTemplate.send(request, new TbQueueCallback() { requestTemplate.send(request, new TbQueueCallback() {
@Override @Override
public void onSuccess(TbQueueMsgMetadata metadata) { public void onSuccess(TbQueueMsgMetadata metadata) {

View File

@ -0,0 +1,40 @@
package org.thingsboard.server.common;
import lombok.Data;
import org.thingsboard.server.TbQueueMsg;
import org.thingsboard.server.TbQueueMsgHeaders;
import java.util.UUID;
@Data
public class TbProtoQueueMsg<T extends com.google.protobuf.GeneratedMessageV3> implements TbQueueMsg {
private final UUID key;
private final T value;
private final DefaultTbQueueMsgHeaders headers;
public TbProtoQueueMsg(UUID key, T value) {
this(key, value, new DefaultTbQueueMsgHeaders());
}
public TbProtoQueueMsg(UUID key, T value, DefaultTbQueueMsgHeaders headers) {
this.key = key;
this.value = value;
this.headers = headers;
}
@Override
public UUID getKey() {
return key;
}
@Override
public TbQueueMsgHeaders getHeaders() {
return headers;
}
@Override
public byte[] getData() {
return value.toByteArray();
}
}

View File

@ -0,0 +1,31 @@
package org.thingsboard.server.common.transport.queue;
import org.springframework.stereotype.Component;
import org.thingsboard.server.TbQueueConsumer;
import org.thingsboard.server.TbQueueProducer;
import org.thingsboard.server.TbQueueRequestTemplate;
import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
@Component
public class KafkaTransportQueueProvider implements TransportQueueProvider {
@Override
public TbQueueRequestTemplate<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> getTransportApiRequestTemplate() {
return null;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> getRuleEngineMsgProducer() {
return null;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> getTbCoreMsgProducer() {
return null;
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> getTransportNotificationsConsumer() {
return null;
}
}

View File

@ -1,31 +0,0 @@
package org.thingsboard.server.common.transport.queue;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
public abstract class TransportApiCall {
protected byte[] uuidToBytes(UUID uuid) {
ByteBuffer buf = ByteBuffer.allocate(16);
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
return buf.array();
}
protected static UUID bytesToUuid(byte[] bytes) {
ByteBuffer bb = ByteBuffer.wrap(bytes);
long firstLong = bb.getLong();
long secondLong = bb.getLong();
return new UUID(firstLong, secondLong);
}
protected byte[] stringToBytes(String string) {
return string.getBytes(StandardCharsets.UTF_8);
}
protected String bytesToString(byte[] data) {
return new String(data, StandardCharsets.UTF_8);
}
}

View File

@ -1,40 +0,0 @@
package org.thingsboard.server.common.transport.queue;
import org.thingsboard.server.TbQueueMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class TransportApiCallRequest extends TransportApiCall implements TbQueueMsg {
public static final String REQUEST_ID_HEADER = "requestId";
public static final String RESPONSE_TOPIC_HEADER = "responseTopic";
private final UUID requestId;
private final Map<String, byte[]> headers;
private final TransportProtos.TransportApiRequestMsg msg;
public TransportApiCallRequest(UUID requestId, String responseTopic, TransportProtos.TransportApiRequestMsg msg) {
this.requestId = requestId;
this.headers = new HashMap<>();
this.headers.put(REQUEST_ID_HEADER, uuidToBytes(requestId));
this.headers.put(RESPONSE_TOPIC_HEADER, stringToBytes(responseTopic));
this.msg = msg;
}
@Override
public UUID getKey() {
return requestId;
}
@Override
public Map<String, byte[]> getHeaders() {
return null;
}
@Override
public byte[] getData() {
return msg.toByteArray();
}
}

View File

@ -1,19 +1,22 @@
package org.thingsboard.server.common.transport.queue; package org.thingsboard.server.common.transport.queue;
import org.thingsboard.server.TbQueueConsumer; import org.thingsboard.server.TbQueueConsumer;
import org.thingsboard.server.TbQueueMsg;
import org.thingsboard.server.TbQueueProducer; import org.thingsboard.server.TbQueueProducer;
import org.thingsboard.server.TbQueueRequestTemplate;
import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
public interface TransportQueueProvider { public interface TransportQueueProvider {
TbQueueProducer<TransportApiCallRequest> getTransportApiCallRequestsProducer(); TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> getTransportApiRequestTemplate();
TbQueueConsumer<TbQueueMsg> getTransportApiCallResponsesConsumer(); TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> getRuleEngineMsgProducer();
TbQueueProducer<TbQueueMsg> getRuleEngineMsgProducer(); TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> getTbCoreMsgProducer();
TbQueueProducer<TbQueueMsg> getTbCoreMsgProducer(); TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsConsumer();
TbQueueConsumer<TbQueueMsg> getTransportNotificationsConsumer();
} }

View File

@ -19,6 +19,10 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.TbQueueConsumer;
import org.thingsboard.server.TbQueueProducer;
import org.thingsboard.server.TbQueueRequestTemplate;
import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
@ -29,6 +33,10 @@ import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.queue.TransportQueueProvider; import org.thingsboard.server.common.transport.queue.TransportQueueProvider;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import java.util.Random; import java.util.Random;
import java.util.UUID; import java.util.UUID;
@ -54,6 +62,15 @@ public abstract class AbstractTransportService implements TransportService {
@Autowired @Autowired
private TransportQueueProvider queueProvider; private TransportQueueProvider queueProvider;
protected TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> transportApiRequestTemplate;
protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;
protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> tbCoreMsgProducer;
protected TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> transportNotificationsConsumer;
protected ScheduledExecutorService schedulerExecutor; protected ScheduledExecutorService schedulerExecutor;
protected ExecutorService transportCallbackExecutor; protected ExecutorService transportCallbackExecutor;
@ -296,8 +313,8 @@ public abstract class AbstractTransportService implements TransportService {
return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
} }
protected String getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) { protected UUID getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) {
return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()).toString(); return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB());
} }
public void init() { public void init() {
@ -309,6 +326,10 @@ public abstract class AbstractTransportService implements TransportService {
this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("transport-scheduler")); this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("transport-scheduler"));
this.transportCallbackExecutor = Executors.newWorkStealingPool(20); this.transportCallbackExecutor = Executors.newWorkStealingPool(20);
this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
transportApiRequestTemplate = queueProvider.getTransportApiRequestTemplate();
ruleEngineMsgProducer = queueProvider.getRuleEngineMsgProducer();
tbCoreMsgProducer = queueProvider.getTbCoreMsgProducer();
transportNotificationsConsumer = queueProvider.getTransportNotificationsConsumer();
} }
public void destroy() { public void destroy() {

View File

@ -16,16 +16,14 @@
package org.thingsboard.server.common.transport.service; package org.thingsboard.server.common.transport.service;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.TbQueueCallback;
import org.thingsboard.server.TbQueueMsgMetadata;
import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
@ -43,22 +41,18 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
import org.thingsboard.server.kafka.AsyncCallbackTemplate; import org.thingsboard.server.kafka.AsyncCallbackTemplate;
import org.thingsboard.server.kafka.TBKafkaAdmin;
import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
import org.thingsboard.server.kafka.TbKafkaSettings; import org.thingsboard.server.kafka.TbKafkaSettings;
import org.thingsboard.server.kafka.TbNodeIdProvider; import org.thingsboard.server.kafka.TbNodeIdProvider;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.time.Duration; import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -97,10 +91,6 @@ public class RemoteTransportService extends AbstractTransportService {
@Autowired @Autowired
private TbNodeIdProvider nodeIdProvider; private TbNodeIdProvider nodeIdProvider;
private TbKafkaRequestTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
private TBKafkaProducerTemplate<ToRuleEngineMsg> ruleEngineProducer;
private TBKafkaConsumerTemplate<ToTransportMsg> mainConsumer;
private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("remote-transport-consumer")); private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("remote-transport-consumer"));
private volatile boolean stopped = false; private volatile boolean stopped = false;
@ -109,67 +99,67 @@ public class RemoteTransportService extends AbstractTransportService {
public void init() { public void init() {
super.init(); super.init();
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiRequestMsg> requestBuilder = TBKafkaProducerTemplate.builder(); // TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiRequestMsg> requestBuilder = TBKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings); // requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("producer-transport-api-request-" + nodeIdProvider.getNodeId()); // requestBuilder.clientId("producer-transport-api-request-" + nodeIdProvider.getNodeId());
requestBuilder.defaultTopic(transportApiRequestsTopic); // requestBuilder.defaultTopic(transportApiRequestsTopic);
requestBuilder.encoder(new TransportApiRequestEncoder()); // requestBuilder.encoder(new TransportApiRequestEncoder());
//
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaConsumerTemplate.builder(); // TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaConsumerTemplate.builder();
responseBuilder.settings(kafkaSettings); // responseBuilder.settings(kafkaSettings);
responseBuilder.topic(transportApiResponsesTopic + "." + nodeIdProvider.getNodeId()); // responseBuilder.topic(transportApiResponsesTopic + "." + nodeIdProvider.getNodeId());
responseBuilder.clientId("transport-api-client-" + nodeIdProvider.getNodeId()); // responseBuilder.clientId("transport-api-client-" + nodeIdProvider.getNodeId());
responseBuilder.groupId("transport-api-client"); // responseBuilder.groupId("transport-api-client");
responseBuilder.autoCommit(true); // responseBuilder.autoCommit(true);
responseBuilder.autoCommitIntervalMs(autoCommitInterval); // responseBuilder.autoCommitIntervalMs(autoCommitInterval);
responseBuilder.decoder(new TransportApiResponseDecoder()); // responseBuilder.decoder(new TransportApiResponseDecoder());
//
TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder // TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder
<TransportApiRequestMsg, TransportApiResponseMsg> builder = TbKafkaRequestTemplate.builder(); // <TransportApiRequestMsg, TransportApiResponseMsg> builder = TbKafkaRequestTemplate.builder();
builder.requestTemplate(requestBuilder.build()); // builder.requestTemplate(requestBuilder.build());
builder.responseTemplate(responseBuilder.build()); // builder.responseTemplate(responseBuilder.build());
builder.maxPendingRequests(maxPendingRequests); // builder.maxPendingRequests(maxPendingRequests);
builder.maxRequestTimeout(maxRequestsTimeout); // builder.maxRequestTimeout(maxRequestsTimeout);
builder.pollInterval(responsePollDuration); // builder.pollInterval(responsePollDuration);
transportApiTemplate = builder.build(); // transportApiTemplate = builder.build();
transportApiTemplate.init(); // transportApiTemplate.init();
//
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<ToRuleEngineMsg> ruleEngineProducerBuilder = TBKafkaProducerTemplate.builder(); // TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<ToRuleEngineMsg> ruleEngineProducerBuilder = TBKafkaProducerTemplate.builder();
ruleEngineProducerBuilder.settings(kafkaSettings); // ruleEngineProducerBuilder.settings(kafkaSettings);
ruleEngineProducerBuilder.clientId("producer-rule-engine-request-" + nodeIdProvider.getNodeId()); // ruleEngineProducerBuilder.clientId("producer-rule-engine-request-" + nodeIdProvider.getNodeId());
ruleEngineProducerBuilder.defaultTopic(ruleEngineTopic); // ruleEngineProducerBuilder.defaultTopic(ruleEngineTopic);
ruleEngineProducerBuilder.encoder(new ToRuleEngineMsgEncoder()); // ruleEngineProducerBuilder.encoder(new ToRuleEngineMsgEncoder());
ruleEngineProducer = ruleEngineProducerBuilder.build(); // ruleEngineProducer = ruleEngineProducerBuilder.build();
ruleEngineProducer.init(); // ruleEngineProducer.init();
//
String notificationsTopicName = notificationsTopic + "." + nodeIdProvider.getNodeId(); // String notificationsTopicName = notificationsTopic + "." + nodeIdProvider.getNodeId();
//
try { // try {
TBKafkaAdmin admin = new TBKafkaAdmin(kafkaSettings); // TBKafkaAdmin admin = new TBKafkaAdmin(kafkaSettings);
CreateTopicsResult result = admin.createTopic(new NewTopic(notificationsTopicName, 1, (short) 1)); // CreateTopicsResult result = admin.createTopic(new NewTopic(notificationsTopicName, 1, (short) 1));
result.all().get(); // result.all().get();
} catch (Exception e) { // } catch (Exception e) {
log.trace("Failed to create topic: {}", e.getMessage(), e); // log.trace("Failed to create topic: {}", e.getMessage(), e);
} // }
//
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<ToTransportMsg> mainConsumerBuilder = TBKafkaConsumerTemplate.builder(); // TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<ToTransportMsg> mainConsumerBuilder = TBKafkaConsumerTemplate.builder();
mainConsumerBuilder.settings(kafkaSettings); // mainConsumerBuilder.settings(kafkaSettings);
mainConsumerBuilder.topic(notificationsTopicName); // mainConsumerBuilder.topic(notificationsTopicName);
mainConsumerBuilder.clientId("transport-" + nodeIdProvider.getNodeId()); // mainConsumerBuilder.clientId("transport-" + nodeIdProvider.getNodeId());
mainConsumerBuilder.groupId("transport"); // mainConsumerBuilder.groupId("transport");
mainConsumerBuilder.autoCommit(true); // mainConsumerBuilder.autoCommit(true);
mainConsumerBuilder.autoCommitIntervalMs(notificationsAutoCommitInterval); // mainConsumerBuilder.autoCommitIntervalMs(notificationsAutoCommitInterval);
mainConsumerBuilder.decoder(new ToTransportMsgResponseDecoder()); // mainConsumerBuilder.decoder(new ToTransportMsgResponseDecoder());
mainConsumer = mainConsumerBuilder.build(); // mainConsumer = mainConsumerBuilder.build();
mainConsumer.subscribe(); // mainConsumer.subscribe();
mainConsumerExecutor.execute(() -> { mainConsumerExecutor.execute(() -> {
while (!stopped) { while (!stopped) {
try { try {
ConsumerRecords<String, byte[]> records = mainConsumer.poll(Duration.ofMillis(notificationsPollDuration)); List<TbProtoQueueMsg<ToTransportMsg>> records = transportNotificationsConsumer.poll(notificationsPollDuration);
records.forEach(record -> { records.forEach(record -> {
try { try {
ToTransportMsg toTransportMsg = mainConsumer.decode(record); ToTransportMsg toTransportMsg = record.getValue();
if (toTransportMsg.hasToDeviceSessionMsg()) { if (toTransportMsg.hasToDeviceSessionMsg()) {
processToTransportMsg(toTransportMsg.getToDeviceSessionMsg()); processToTransportMsg(toTransportMsg.getToDeviceSessionMsg());
} }
@ -193,12 +183,12 @@ public class RemoteTransportService extends AbstractTransportService {
public void destroy() { public void destroy() {
super.destroy(); super.destroy();
stopped = true; stopped = true;
if (transportApiTemplate != null) { // if (transportApiTemplate != null) {
transportApiTemplate.stop(); // transportApiTemplate.stop();
} // }
if (mainConsumer != null) { // if (mainConsumer != null) {
mainConsumer.unsubscribe(); // mainConsumer.unsubscribe();
} // }
if (mainConsumerExecutor != null) { if (mainConsumerExecutor != null) {
mainConsumerExecutor.shutdownNow(); mainConsumerExecutor.shutdownNow();
} }
@ -207,25 +197,25 @@ public class RemoteTransportService extends AbstractTransportService {
@Override @Override
public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) { public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
log.trace("Processing msg: {}", msg); log.trace("Processing msg: {}", msg);
AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getToken(), TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build());
TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()), AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg),
response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor); response -> callback.onSuccess(response.getValue().getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
} }
@Override @Override
public void process(ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) { public void process(ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
log.trace("Processing msg: {}", msg); log.trace("Processing msg: {}", msg);
AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getHash(), TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build());
TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()), AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg),
response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor); response -> callback.onSuccess(response.getValue().getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
} }
@Override @Override
public void process(GetOrCreateDeviceFromGatewayRequestMsg msg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg> callback) { public void process(GetOrCreateDeviceFromGatewayRequestMsg msg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg> callback) {
log.trace("Processing msg: {}", msg); log.trace("Processing msg: {}", msg);
AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getDeviceName(), TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(msg).build());
TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(msg).build()), AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg),
response -> callback.onSuccess(response.getGetOrCreateDeviceResponseMsg()), callback::onError, transportCallbackExecutor); response -> callback.onSuccess(response.getValue().getGetOrCreateDeviceResponseMsg()), callback::onError, transportCallbackExecutor);
} }
@Override @Override
@ -322,18 +312,30 @@ public class RemoteTransportService extends AbstractTransportService {
} }
private void send(SessionInfoProto sessionInfo, ToRuleEngineMsg toRuleEngineMsg, TransportServiceCallback<Void> callback) { private void send(SessionInfoProto sessionInfo, ToRuleEngineMsg toRuleEngineMsg, TransportServiceCallback<Void> callback) {
ruleEngineProducer.send(getRoutingKey(sessionInfo), toRuleEngineMsg, (metadata, exception) -> { ruleEngineMsgProducer.send(new TbProtoQueueMsg<>(getRoutingKey(sessionInfo), toRuleEngineMsg), callback != null ?
if (callback != null) { new TransportTbQueueCallback(callback) : null);
if (exception == null) {
this.transportCallbackExecutor.submit(() -> { }
private class TransportTbQueueCallback implements TbQueueCallback {
private final TransportServiceCallback<Void> callback;
private TransportTbQueueCallback(TransportServiceCallback<Void> callback) {
this.callback = callback;
}
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
RemoteTransportService.this.transportCallbackExecutor.submit(() -> {
callback.onSuccess(null); callback.onSuccess(null);
}); });
} else { }
this.transportCallbackExecutor.submit(() -> {
callback.onError(exception); @Override
public void onFailure(Throwable t) {
RemoteTransportService.this.transportCallbackExecutor.submit(() -> {
callback.onError(t);
}); });
} }
} }
});
}
} }