diff --git a/common/queue/pom.xml b/common/queue/pom.xml index 8f0a0913a4..952b3c540d 100644 --- a/common/queue/pom.xml +++ b/common/queue/pom.xml @@ -94,6 +94,10 @@ mockito-all test + + com.google.protobuf + protobuf-java + diff --git a/common/queue/src/main/java/org/thingsboard/server/TbQueueRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/TbQueueRequestTemplate.java index 85b5cc3f15..5ba28f22c5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/TbQueueRequestTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/TbQueueRequestTemplate.java @@ -4,6 +4,6 @@ import com.google.common.util.concurrent.ListenableFuture; public interface TbQueueRequestTemplate { - ListenableFuture send(String key, Request request); + ListenableFuture send(Request request); } diff --git a/common/queue/src/main/java/org/thingsboard/server/common/DefaultTbQueueMsgHeaders.java b/common/queue/src/main/java/org/thingsboard/server/common/DefaultTbQueueMsgHeaders.java new file mode 100644 index 0000000000..4ef33a489d --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/common/DefaultTbQueueMsgHeaders.java @@ -0,0 +1,21 @@ +package org.thingsboard.server.common; + +import org.thingsboard.server.TbQueueMsgHeaders; + +import java.util.HashMap; +import java.util.Map; + +public class DefaultTbQueueMsgHeaders implements TbQueueMsgHeaders { + + private final Map data = new HashMap<>(); + + @Override + public byte[] put(String key, byte[] value) { + return data.put(key, value); + } + + @Override + public byte[] get(String key) { + return data.get(key); + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/common/DefaultTbQueueRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/common/DefaultTbQueueRequestTemplate.java index 2965b7a291..49022ad2e3 100644 --- a/common/queue/src/main/java/org/thingsboard/server/common/DefaultTbQueueRequestTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/common/DefaultTbQueueRequestTemplate.java @@ -133,7 +133,7 @@ public class DefaultTbQueueRequestTemplate send(String key, Request request) { + public ListenableFuture send(Request request) { if (tickSize > maxPendingRequests) { return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!")); } @@ -143,7 +143,7 @@ public class DefaultTbQueueRequestTemplate future = SettableFuture.create(); ResponseMetaData responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future); pendingRequests.putIfAbsent(requestId, responseMetaData); - log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, key, responseMetaData.expTime); + log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, request.getKey(), responseMetaData.expTime); requestTemplate.send(request, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { diff --git a/common/queue/src/main/java/org/thingsboard/server/common/TbProtoQueueMsg.java b/common/queue/src/main/java/org/thingsboard/server/common/TbProtoQueueMsg.java new file mode 100644 index 0000000000..f1120b876f --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/common/TbProtoQueueMsg.java @@ -0,0 +1,40 @@ +package org.thingsboard.server.common; + +import lombok.Data; +import org.thingsboard.server.TbQueueMsg; +import org.thingsboard.server.TbQueueMsgHeaders; + +import java.util.UUID; + +@Data +public class TbProtoQueueMsg implements TbQueueMsg { + + private final UUID key; + private final T value; + private final DefaultTbQueueMsgHeaders headers; + + public TbProtoQueueMsg(UUID key, T value) { + this(key, value, new DefaultTbQueueMsgHeaders()); + } + + public TbProtoQueueMsg(UUID key, T value, DefaultTbQueueMsgHeaders headers) { + this.key = key; + this.value = value; + this.headers = headers; + } + + @Override + public UUID getKey() { + return key; + } + + @Override + public TbQueueMsgHeaders getHeaders() { + return headers; + } + + @Override + public byte[] getData() { + return value.toByteArray(); + } +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/KafkaTransportQueueProvider.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/KafkaTransportQueueProvider.java new file mode 100644 index 0000000000..2aa6f90b7b --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/KafkaTransportQueueProvider.java @@ -0,0 +1,31 @@ +package org.thingsboard.server.common.transport.queue; + +import org.springframework.stereotype.Component; +import org.thingsboard.server.TbQueueConsumer; +import org.thingsboard.server.TbQueueProducer; +import org.thingsboard.server.TbQueueRequestTemplate; +import org.thingsboard.server.common.TbProtoQueueMsg; +import org.thingsboard.server.gen.transport.TransportProtos; + +@Component +public class KafkaTransportQueueProvider implements TransportQueueProvider { + @Override + public TbQueueRequestTemplate, TbProtoQueueMsg> getTransportApiRequestTemplate() { + return null; + } + + @Override + public TbQueueProducer> getRuleEngineMsgProducer() { + return null; + } + + @Override + public TbQueueProducer> getTbCoreMsgProducer() { + return null; + } + + @Override + public TbQueueConsumer> getTransportNotificationsConsumer() { + return null; + } +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/TransportApiCall.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/TransportApiCall.java deleted file mode 100644 index 956c1138f1..0000000000 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/TransportApiCall.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.thingsboard.server.common.transport.queue; - -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.UUID; - -public abstract class TransportApiCall { - - protected byte[] uuidToBytes(UUID uuid) { - ByteBuffer buf = ByteBuffer.allocate(16); - buf.putLong(uuid.getMostSignificantBits()); - buf.putLong(uuid.getLeastSignificantBits()); - return buf.array(); - } - - protected static UUID bytesToUuid(byte[] bytes) { - ByteBuffer bb = ByteBuffer.wrap(bytes); - long firstLong = bb.getLong(); - long secondLong = bb.getLong(); - return new UUID(firstLong, secondLong); - } - - protected byte[] stringToBytes(String string) { - return string.getBytes(StandardCharsets.UTF_8); - } - - protected String bytesToString(byte[] data) { - return new String(data, StandardCharsets.UTF_8); - } - -} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/TransportApiCallRequest.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/TransportApiCallRequest.java deleted file mode 100644 index 49b83e2c17..0000000000 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/TransportApiCallRequest.java +++ /dev/null @@ -1,40 +0,0 @@ -package org.thingsboard.server.common.transport.queue; - -import org.thingsboard.server.TbQueueMsg; -import org.thingsboard.server.gen.transport.TransportProtos; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -public class TransportApiCallRequest extends TransportApiCall implements TbQueueMsg { - public static final String REQUEST_ID_HEADER = "requestId"; - public static final String RESPONSE_TOPIC_HEADER = "responseTopic"; - - private final UUID requestId; - private final Map headers; - private final TransportProtos.TransportApiRequestMsg msg; - - public TransportApiCallRequest(UUID requestId, String responseTopic, TransportProtos.TransportApiRequestMsg msg) { - this.requestId = requestId; - this.headers = new HashMap<>(); - this.headers.put(REQUEST_ID_HEADER, uuidToBytes(requestId)); - this.headers.put(RESPONSE_TOPIC_HEADER, stringToBytes(responseTopic)); - this.msg = msg; - } - - @Override - public UUID getKey() { - return requestId; - } - - @Override - public Map getHeaders() { - return null; - } - - @Override - public byte[] getData() { - return msg.toByteArray(); - } -} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/TransportQueueProvider.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/TransportQueueProvider.java index d4ce11afec..d0bbafc615 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/TransportQueueProvider.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/TransportQueueProvider.java @@ -1,19 +1,22 @@ package org.thingsboard.server.common.transport.queue; import org.thingsboard.server.TbQueueConsumer; -import org.thingsboard.server.TbQueueMsg; import org.thingsboard.server.TbQueueProducer; +import org.thingsboard.server.TbQueueRequestTemplate; +import org.thingsboard.server.common.TbProtoQueueMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; public interface TransportQueueProvider { - TbQueueProducer getTransportApiCallRequestsProducer(); + TbQueueRequestTemplate, TbProtoQueueMsg> getTransportApiRequestTemplate(); - TbQueueConsumer getTransportApiCallResponsesConsumer(); + TbQueueProducer> getRuleEngineMsgProducer(); - TbQueueProducer getRuleEngineMsgProducer(); + TbQueueProducer> getTbCoreMsgProducer(); - TbQueueProducer getTbCoreMsgProducer(); - - TbQueueConsumer getTransportNotificationsConsumer(); + TbQueueConsumer> getTransportNotificationsConsumer(); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java index 0aaa8cef68..1a66ea9601 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,6 +19,10 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.TbQueueConsumer; +import org.thingsboard.server.TbQueueProducer; +import org.thingsboard.server.TbQueueRequestTemplate; +import org.thingsboard.server.common.TbProtoQueueMsg; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; @@ -29,6 +33,10 @@ import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.queue.TransportQueueProvider; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import java.util.Random; import java.util.UUID; @@ -54,6 +62,15 @@ public abstract class AbstractTransportService implements TransportService { @Autowired private TransportQueueProvider queueProvider; + + protected TbQueueRequestTemplate, TbProtoQueueMsg> transportApiRequestTemplate; + + protected TbQueueProducer> ruleEngineMsgProducer; + + protected TbQueueProducer> tbCoreMsgProducer; + + protected TbQueueConsumer> transportNotificationsConsumer; + protected ScheduledExecutorService schedulerExecutor; protected ExecutorService transportCallbackExecutor; @@ -296,8 +313,8 @@ public abstract class AbstractTransportService implements TransportService { return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); } - protected String getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) { - return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()).toString(); + protected UUID getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) { + return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()); } public void init() { @@ -309,6 +326,10 @@ public abstract class AbstractTransportService implements TransportService { this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("transport-scheduler")); this.transportCallbackExecutor = Executors.newWorkStealingPool(20); this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); + transportApiRequestTemplate = queueProvider.getTransportApiRequestTemplate(); + ruleEngineMsgProducer = queueProvider.getRuleEngineMsgProducer(); + tbCoreMsgProducer = queueProvider.getTbCoreMsgProducer(); + transportNotificationsConsumer = queueProvider.getTransportNotificationsConsumer(); } public void destroy() { diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java index 17031791d5..58685db764 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,16 +16,14 @@ package org.thingsboard.server.common.transport.service; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.admin.CreateTopicsResult; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.TbQueueCallback; +import org.thingsboard.server.TbQueueMsgMetadata; +import org.thingsboard.server.common.TbProtoQueueMsg; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; @@ -43,22 +41,18 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; -import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; import org.thingsboard.server.kafka.AsyncCallbackTemplate; -import org.thingsboard.server.kafka.TBKafkaAdmin; -import org.thingsboard.server.kafka.TBKafkaConsumerTemplate; -import org.thingsboard.server.kafka.TBKafkaProducerTemplate; -import org.thingsboard.server.kafka.TbKafkaRequestTemplate; import org.thingsboard.server.kafka.TbKafkaSettings; import org.thingsboard.server.kafka.TbNodeIdProvider; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.time.Duration; +import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -97,10 +91,6 @@ public class RemoteTransportService extends AbstractTransportService { @Autowired private TbNodeIdProvider nodeIdProvider; - private TbKafkaRequestTemplate transportApiTemplate; - private TBKafkaProducerTemplate ruleEngineProducer; - private TBKafkaConsumerTemplate mainConsumer; - private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("remote-transport-consumer")); private volatile boolean stopped = false; @@ -109,67 +99,67 @@ public class RemoteTransportService extends AbstractTransportService { public void init() { super.init(); - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder requestBuilder = TBKafkaProducerTemplate.builder(); - requestBuilder.settings(kafkaSettings); - requestBuilder.clientId("producer-transport-api-request-" + nodeIdProvider.getNodeId()); - requestBuilder.defaultTopic(transportApiRequestsTopic); - requestBuilder.encoder(new TransportApiRequestEncoder()); - - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder responseBuilder = TBKafkaConsumerTemplate.builder(); - responseBuilder.settings(kafkaSettings); - responseBuilder.topic(transportApiResponsesTopic + "." + nodeIdProvider.getNodeId()); - responseBuilder.clientId("transport-api-client-" + nodeIdProvider.getNodeId()); - responseBuilder.groupId("transport-api-client"); - responseBuilder.autoCommit(true); - responseBuilder.autoCommitIntervalMs(autoCommitInterval); - responseBuilder.decoder(new TransportApiResponseDecoder()); - - TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder - builder = TbKafkaRequestTemplate.builder(); - builder.requestTemplate(requestBuilder.build()); - builder.responseTemplate(responseBuilder.build()); - builder.maxPendingRequests(maxPendingRequests); - builder.maxRequestTimeout(maxRequestsTimeout); - builder.pollInterval(responsePollDuration); - transportApiTemplate = builder.build(); - transportApiTemplate.init(); - - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder ruleEngineProducerBuilder = TBKafkaProducerTemplate.builder(); - ruleEngineProducerBuilder.settings(kafkaSettings); - ruleEngineProducerBuilder.clientId("producer-rule-engine-request-" + nodeIdProvider.getNodeId()); - ruleEngineProducerBuilder.defaultTopic(ruleEngineTopic); - ruleEngineProducerBuilder.encoder(new ToRuleEngineMsgEncoder()); - ruleEngineProducer = ruleEngineProducerBuilder.build(); - ruleEngineProducer.init(); - - String notificationsTopicName = notificationsTopic + "." + nodeIdProvider.getNodeId(); - - try { - TBKafkaAdmin admin = new TBKafkaAdmin(kafkaSettings); - CreateTopicsResult result = admin.createTopic(new NewTopic(notificationsTopicName, 1, (short) 1)); - result.all().get(); - } catch (Exception e) { - log.trace("Failed to create topic: {}", e.getMessage(), e); - } - - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder mainConsumerBuilder = TBKafkaConsumerTemplate.builder(); - mainConsumerBuilder.settings(kafkaSettings); - mainConsumerBuilder.topic(notificationsTopicName); - mainConsumerBuilder.clientId("transport-" + nodeIdProvider.getNodeId()); - mainConsumerBuilder.groupId("transport"); - mainConsumerBuilder.autoCommit(true); - mainConsumerBuilder.autoCommitIntervalMs(notificationsAutoCommitInterval); - mainConsumerBuilder.decoder(new ToTransportMsgResponseDecoder()); - mainConsumer = mainConsumerBuilder.build(); - mainConsumer.subscribe(); +// TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder requestBuilder = TBKafkaProducerTemplate.builder(); +// requestBuilder.settings(kafkaSettings); +// requestBuilder.clientId("producer-transport-api-request-" + nodeIdProvider.getNodeId()); +// requestBuilder.defaultTopic(transportApiRequestsTopic); +// requestBuilder.encoder(new TransportApiRequestEncoder()); +// +// TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder responseBuilder = TBKafkaConsumerTemplate.builder(); +// responseBuilder.settings(kafkaSettings); +// responseBuilder.topic(transportApiResponsesTopic + "." + nodeIdProvider.getNodeId()); +// responseBuilder.clientId("transport-api-client-" + nodeIdProvider.getNodeId()); +// responseBuilder.groupId("transport-api-client"); +// responseBuilder.autoCommit(true); +// responseBuilder.autoCommitIntervalMs(autoCommitInterval); +// responseBuilder.decoder(new TransportApiResponseDecoder()); +// +// TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder +// builder = TbKafkaRequestTemplate.builder(); +// builder.requestTemplate(requestBuilder.build()); +// builder.responseTemplate(responseBuilder.build()); +// builder.maxPendingRequests(maxPendingRequests); +// builder.maxRequestTimeout(maxRequestsTimeout); +// builder.pollInterval(responsePollDuration); +// transportApiTemplate = builder.build(); +// transportApiTemplate.init(); +// +// TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder ruleEngineProducerBuilder = TBKafkaProducerTemplate.builder(); +// ruleEngineProducerBuilder.settings(kafkaSettings); +// ruleEngineProducerBuilder.clientId("producer-rule-engine-request-" + nodeIdProvider.getNodeId()); +// ruleEngineProducerBuilder.defaultTopic(ruleEngineTopic); +// ruleEngineProducerBuilder.encoder(new ToRuleEngineMsgEncoder()); +// ruleEngineProducer = ruleEngineProducerBuilder.build(); +// ruleEngineProducer.init(); +// +// String notificationsTopicName = notificationsTopic + "." + nodeIdProvider.getNodeId(); +// +// try { +// TBKafkaAdmin admin = new TBKafkaAdmin(kafkaSettings); +// CreateTopicsResult result = admin.createTopic(new NewTopic(notificationsTopicName, 1, (short) 1)); +// result.all().get(); +// } catch (Exception e) { +// log.trace("Failed to create topic: {}", e.getMessage(), e); +// } +// +// TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder mainConsumerBuilder = TBKafkaConsumerTemplate.builder(); +// mainConsumerBuilder.settings(kafkaSettings); +// mainConsumerBuilder.topic(notificationsTopicName); +// mainConsumerBuilder.clientId("transport-" + nodeIdProvider.getNodeId()); +// mainConsumerBuilder.groupId("transport"); +// mainConsumerBuilder.autoCommit(true); +// mainConsumerBuilder.autoCommitIntervalMs(notificationsAutoCommitInterval); +// mainConsumerBuilder.decoder(new ToTransportMsgResponseDecoder()); +// mainConsumer = mainConsumerBuilder.build(); +// mainConsumer.subscribe(); mainConsumerExecutor.execute(() -> { while (!stopped) { try { - ConsumerRecords records = mainConsumer.poll(Duration.ofMillis(notificationsPollDuration)); + List> records = transportNotificationsConsumer.poll(notificationsPollDuration); records.forEach(record -> { try { - ToTransportMsg toTransportMsg = mainConsumer.decode(record); + ToTransportMsg toTransportMsg = record.getValue(); if (toTransportMsg.hasToDeviceSessionMsg()) { processToTransportMsg(toTransportMsg.getToDeviceSessionMsg()); } @@ -193,12 +183,12 @@ public class RemoteTransportService extends AbstractTransportService { public void destroy() { super.destroy(); stopped = true; - if (transportApiTemplate != null) { - transportApiTemplate.stop(); - } - if (mainConsumer != null) { - mainConsumer.unsubscribe(); - } +// if (transportApiTemplate != null) { +// transportApiTemplate.stop(); +// } +// if (mainConsumer != null) { +// mainConsumer.unsubscribe(); +// } if (mainConsumerExecutor != null) { mainConsumerExecutor.shutdownNow(); } @@ -207,25 +197,25 @@ public class RemoteTransportService extends AbstractTransportService { @Override public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback callback) { log.trace("Processing msg: {}", msg); - AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getToken(), - TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()), - response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor); + TbProtoQueueMsg protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()); + AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg), + response -> callback.onSuccess(response.getValue().getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor); } @Override public void process(ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback callback) { log.trace("Processing msg: {}", msg); - AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getHash(), - TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()), - response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor); + TbProtoQueueMsg protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()); + AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg), + response -> callback.onSuccess(response.getValue().getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor); } @Override public void process(GetOrCreateDeviceFromGatewayRequestMsg msg, TransportServiceCallback callback) { log.trace("Processing msg: {}", msg); - AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getDeviceName(), - TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(msg).build()), - response -> callback.onSuccess(response.getGetOrCreateDeviceResponseMsg()), callback::onError, transportCallbackExecutor); + TbProtoQueueMsg protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(msg).build()); + AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg), + response -> callback.onSuccess(response.getValue().getGetOrCreateDeviceResponseMsg()), callback::onError, transportCallbackExecutor); } @Override @@ -322,18 +312,30 @@ public class RemoteTransportService extends AbstractTransportService { } private void send(SessionInfoProto sessionInfo, ToRuleEngineMsg toRuleEngineMsg, TransportServiceCallback callback) { - ruleEngineProducer.send(getRoutingKey(sessionInfo), toRuleEngineMsg, (metadata, exception) -> { - if (callback != null) { - if (exception == null) { - this.transportCallbackExecutor.submit(() -> { - callback.onSuccess(null); - }); - } else { - this.transportCallbackExecutor.submit(() -> { - callback.onError(exception); - }); - } - } - }); + ruleEngineMsgProducer.send(new TbProtoQueueMsg<>(getRoutingKey(sessionInfo), toRuleEngineMsg), callback != null ? + new TransportTbQueueCallback(callback) : null); + + } + + private class TransportTbQueueCallback implements TbQueueCallback { + private final TransportServiceCallback callback; + + private TransportTbQueueCallback(TransportServiceCallback callback) { + this.callback = callback; + } + + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + RemoteTransportService.this.transportCallbackExecutor.submit(() -> { + callback.onSuccess(null); + }); + } + + @Override + public void onFailure(Throwable t) { + RemoteTransportService.this.transportCallbackExecutor.submit(() -> { + callback.onError(t); + }); + } } }