MqttHandler - processAsync (required for AbstractMqttIntegration)

This commit is contained in:
Sergey Matvienko 2023-06-20 14:26:30 +02:00
parent 7e27c5b683
commit d74e0c45df
3 changed files with 8 additions and 3 deletions

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.msa.connectivity; package org.thingsboard.server.msa.connectivity;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
@ -493,9 +494,10 @@ public class MqttClientTest extends AbstractContainerTest {
} }
@Override @Override
public void onMessage(String topic, ByteBuf message) { public ListenableFuture<Void> onMessage(String topic, ByteBuf message) {
log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic); log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic);
events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8))); events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8)));
return Futures.immediateVoidFuture();
} }
} }

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.msa.connectivity; package org.thingsboard.server.msa.connectivity;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
@ -435,9 +436,10 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
} }
@Override @Override
public void onMessage(String topic, ByteBuf message) { public ListenableFuture<Void> onMessage(String topic, ByteBuf message) {
log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic); log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic);
events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8))); events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8)));
return Futures.immediateVoidFuture();
} }
} }

View File

@ -15,9 +15,10 @@
*/ */
package org.thingsboard.mqtt; package org.thingsboard.mqtt;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
public interface MqttHandler { public interface MqttHandler {
void onMessage(String topic, ByteBuf payload); ListenableFuture<Void> onMessage(String topic, ByteBuf payload);
} }