diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java index 45f84337f8..4b4fec318a 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java @@ -112,7 +112,6 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ public void init() { TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder notificationsProducerBuilder = TBKafkaProducerTemplate.builder(); notificationsProducerBuilder.settings(kafkaSettings); - notificationsProducerBuilder.defaultTopic(notificationsTopic); notificationsProducerBuilder.encoder(new ToTransportMsgEncoder()); notificationsProducer = notificationsProducerBuilder.build(); diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java index 88033aa86b..365413c757 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java @@ -43,8 +43,6 @@ public class RemoteTransportApiService { @Value("${transport.remote.transport_api.requests_topic}") private String transportApiRequestsTopic; - @Value("${transport.remote.transport_api.responses_topic}") - private String transportApiResponsesTopic; @Value("${transport.remote.transport_api.max_pending_requests}") private int maxPendingRequests; @Value("${transport.remote.transport_api.request_timeout}") @@ -73,7 +71,6 @@ public class RemoteTransportApiService { TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder responseBuilder = TBKafkaProducerTemplate.builder(); responseBuilder.settings(kafkaSettings); - responseBuilder.defaultTopic(transportApiResponsesTopic); responseBuilder.encoder(new TransportApiResponseEncoder()); TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder requestBuilder = TBKafkaConsumerTemplate.builder(); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 52f6395a13..ed18a0a7c6 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -55,6 +55,8 @@ rpc: # Clustering properties related to consistent-hashing. See architecture docs for more details. cluster: + # Unique id for this node (autogenerated if empty) + node_id: "${CLUSTER_NODE_ID:}" # Name of hash function used for consistent hash ring. hash_function_name: "${CLUSTER_HASH_FUNCTION_NAME:murmur3_128}" # Amount of virtual nodes in consistent hash ring. @@ -392,7 +394,6 @@ transport: remote: transport_api: requests_topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}" - responses_topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}" max_pending_requests: "${TB_TRANSPORT_MAX_PENDING_REQUESTS:10000}" request_timeout: "${TB_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}" request_poll_interval: "${TB_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}" diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java index f6bafef45d..7cc90a0a8b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java @@ -15,15 +15,17 @@ */ package org.thingsboard.server.kafka; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.CreateTopicsResult; -import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaFuture; import java.time.Duration; import java.util.Collections; import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Created by ashvayka on 24.09.18. @@ -36,7 +38,32 @@ public class TBKafkaAdmin { client = AdminClient.create(settings.toProps()); } + public void waitForTopic(String topic, long timeout, TimeUnit timeoutUnit) throws InterruptedException, TimeoutException { + synchronized (this) { + long timeoutExpiredMs = System.currentTimeMillis() + timeoutUnit.toMillis(timeout); + while (!topicExists(topic)) { + long waitMs = timeoutExpiredMs - System.currentTimeMillis(); + if (waitMs <= 0) { + throw new TimeoutException("Timeout occurred while waiting for topic [" + topic + "] to be available!"); + } else { + wait(1000); + } + } + } + } + public CreateTopicsResult createTopic(NewTopic topic){ return client.createTopics(Collections.singletonList(topic)); } + + private boolean topicExists(String topic) throws InterruptedException { + KafkaFuture topicDescriptionFuture = client.describeTopics(Collections.singleton(topic)).values().get(topic); + try { + topicDescriptionFuture.get(); + return true; + } catch (ExecutionException e) { + return false; + } + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java index ee652f455c..6cf507fe59 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java @@ -20,14 +20,17 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.header.Header; +import org.springframework.util.StringUtils; import java.util.List; import java.util.Properties; @@ -35,6 +38,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; /** * Created by ashvayka on 24.09.18. @@ -71,21 +75,19 @@ public class TBKafkaProducerTemplate { } public void init() { - try { - TBKafkaAdmin admin = new TBKafkaAdmin(this.settings); - CreateTopicsResult result = admin.createTopic(new NewTopic(defaultTopic, 100, (short) 1)); - result.all().get(); - } catch (Exception e) { - if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) { - log.trace("[{}] Topic already exists.", defaultTopic); - } else { - log.info("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e); + this.partitionInfoMap = new ConcurrentHashMap<>(); + if (!StringUtils.isEmpty(defaultTopic)) { + try { + TBKafkaAdmin admin = new TBKafkaAdmin(this.settings); + admin.waitForTopic(defaultTopic, 30, TimeUnit.SECONDS); + log.info("[{}] Topic exists.", defaultTopic); + } catch (Exception e) { + log.info("[{}] Failed to wait for topic: {}", defaultTopic, e.getMessage(), e); throw new RuntimeException(e); } + //Maybe this should not be cached, but we don't plan to change size of partitions + this.partitionInfoMap.putIfAbsent(defaultTopic, producer.partitionsFor(defaultTopic)); } - //Maybe this should not be cached, but we don't plan to change size of partitions - this.partitionInfoMap = new ConcurrentHashMap<>(); - this.partitionInfoMap.putIfAbsent(defaultTopic, producer.partitionsFor(defaultTopic)); } T enrich(T value, String responseTopic, UUID requestId) { @@ -105,7 +107,11 @@ public class TBKafkaProducerTemplate { } public Future send(String key, T value, Long timestamp, Iterable
headers, Callback callback) { - return send(this.defaultTopic, key, value, timestamp, headers, callback); + if (!StringUtils.isEmpty(this.defaultTopic)) { + return send(this.defaultTopic, key, value, timestamp, headers, callback); + } else { + throw new RuntimeException("Failed to send message! Default topic is not specified!"); + } } public Future send(String topic, String key, T value, Iterable
headers, Callback callback) { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 267d56002d..944ed66880 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -56,6 +56,7 @@ services: max-file: "30" environment: TB_HOST: tb1 + CLUSTER_NODE_ID: tb1 env_file: - tb-node.env volumes: @@ -77,6 +78,7 @@ services: max-file: "30" environment: TB_HOST: tb2 + CLUSTER_NODE_ID: tb2 env_file: - tb-node.env volumes: @@ -93,6 +95,7 @@ services: - "1883" environment: TB_HOST: tb-mqtt-transport1 + CLUSTER_NODE_ID: tb-mqtt-transport1 env_file: - tb-mqtt-transport.env volumes: @@ -107,6 +110,7 @@ services: - "1883" environment: TB_HOST: tb-mqtt-transport2 + CLUSTER_NODE_ID: tb-mqtt-transport2 env_file: - tb-mqtt-transport.env volumes: @@ -121,6 +125,7 @@ services: - "8081" environment: TB_HOST: tb-http-transport1 + CLUSTER_NODE_ID: tb-http-transport1 env_file: - tb-http-transport.env volumes: @@ -135,6 +140,7 @@ services: - "8081" environment: TB_HOST: tb-http-transport2 + CLUSTER_NODE_ID: tb-http-transport2 env_file: - tb-http-transport.env volumes: @@ -149,6 +155,7 @@ services: - "5683:5683/udp" environment: TB_HOST: tb-coap-transport + CLUSTER_NODE_ID: tb-coap-transport env_file: - tb-coap-transport.env volumes: diff --git a/docker/kafka.env b/docker/kafka.env index 87dad076d2..485b3c0169 100644 --- a/docker/kafka.env +++ b/docker/kafka.env @@ -4,7 +4,7 @@ KAFKA_LISTENERS=INSIDE://:9093,OUTSIDE://:9092 KAFKA_ADVERTISED_LISTENERS=INSIDE://:9093,OUTSIDE://kafka:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE -KAFKA_CREATE_TOPICS=js.eval.requests:100:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.transport.api.requests:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.rule-engine:30:1 +KAFKA_CREATE_TOPICS=js.eval.requests:100:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.transport.api.requests:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.rule-engine:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600 KAFKA_AUTO_CREATE_TOPICS_ENABLE=false KAFKA_LOG_RETENTION_BYTES=1073741824 KAFKA_LOG_SEGMENT_BYTES=268435456 diff --git a/docker/tb-node.env b/docker/tb-node.env index ca945ab6ed..963943dccd 100644 --- a/docker/tb-node.env +++ b/docker/tb-node.env @@ -8,3 +8,5 @@ JS_EVALUATOR=remote TRANSPORT_TYPE=remote CACHE_TYPE=redis REDIS_HOST=redis + +HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE=false diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 9918963a1c..324f3cf828 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -66,7 +66,7 @@ public class MqttClientTest extends AbstractContainerTest { WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS); MqttClient mqttClient = getMqttClient(deviceCredentials, null); - mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload().toString().getBytes())); + mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload().toString().getBytes())).get(); WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(); log.info("Received telemetry: {}", actualLatestTelemetry); wsClient.closeBlocking(); @@ -93,7 +93,7 @@ public class MqttClientTest extends AbstractContainerTest { WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS); MqttClient mqttClient = getMqttClient(deviceCredentials, null); - mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload(ts).toString().getBytes())); + mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload(ts).toString().getBytes())).get(); WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(); log.info("Received telemetry: {}", actualLatestTelemetry); wsClient.closeBlocking(); @@ -123,7 +123,7 @@ public class MqttClientTest extends AbstractContainerTest { clientAttributes.addProperty("attr2", true); clientAttributes.addProperty("attr3", 42.0); clientAttributes.addProperty("attr4", 73); - mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes())); + mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes())).get(); WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(); log.info("Received telemetry: {}", actualLatestTelemetry); wsClient.closeBlocking(); @@ -146,6 +146,7 @@ public class MqttClientTest extends AbstractContainerTest { Device device = createDevice("mqtt_"); DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId()); + WsClient wsClient = subscribeToWebSocket(device.getId(), "CLIENT_SCOPE", CmdsType.ATTR_SUB_CMDS); MqttMessageListener listener = new MqttMessageListener(); MqttClient mqttClient = getMqttClient(deviceCredentials, listener); @@ -153,7 +154,17 @@ public class MqttClientTest extends AbstractContainerTest { JsonObject clientAttributes = new JsonObject(); String clientAttributeValue = RandomStringUtils.randomAlphanumeric(8); clientAttributes.addProperty("clientAttr", clientAttributeValue); - mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes())); + mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes())).get(); + + WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(); + log.info("Received ws telemetry: {}", actualLatestTelemetry); + wsClient.closeBlocking(); + + Assert.assertEquals(1, actualLatestTelemetry.getData().size()); + Assert.assertEquals(Sets.newHashSet("clientAttr"), + actualLatestTelemetry.getLatestValues().keySet()); + + Assert.assertTrue(verify(actualLatestTelemetry, "clientAttr", clientAttributeValue)); // Add a new shared attribute JsonObject sharedAttributes = new JsonObject(); @@ -166,12 +177,16 @@ public class MqttClientTest extends AbstractContainerTest { Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); // Subscribe to attributes response - mqttClient.on("v1/devices/me/attributes/response/+", listener, MqttQoS.AT_LEAST_ONCE); + mqttClient.on("v1/devices/me/attributes/response/+", listener, MqttQoS.AT_LEAST_ONCE).get(); + + // Wait until subscription is processed + TimeUnit.SECONDS.sleep(3); + // Request attributes JsonObject request = new JsonObject(); request.addProperty("clientKeys", "clientAttr"); request.addProperty("sharedKeys", "sharedAttr"); - mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes())); + mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes())).get(); MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS); AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class); log.info("Received telemetry: {}", attributes); @@ -193,7 +208,10 @@ public class MqttClientTest extends AbstractContainerTest { MqttMessageListener listener = new MqttMessageListener(); MqttClient mqttClient = getMqttClient(deviceCredentials, listener); - mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE); + mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(); + + // Wait until subscription is processed + TimeUnit.SECONDS.sleep(3); String sharedAttributeName = "sharedAttr"; @@ -236,7 +254,10 @@ public class MqttClientTest extends AbstractContainerTest { MqttMessageListener listener = new MqttMessageListener(); MqttClient mqttClient = getMqttClient(deviceCredentials, listener); - mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE); + mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get(); + + // Wait until subscription is processed + TimeUnit.SECONDS.sleep(3); // Send an RPC from the server JsonObject serverRpcPayload = new JsonObject(); @@ -263,7 +284,7 @@ public class MqttClientTest extends AbstractContainerTest { JsonObject clientResponse = new JsonObject(); clientResponse.addProperty("response", "someResponse"); // Send a response to the server's RPC request - mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())); + mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get(); ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS); Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful()); @@ -280,7 +301,7 @@ public class MqttClientTest extends AbstractContainerTest { MqttMessageListener listener = new MqttMessageListener(); MqttClient mqttClient = getMqttClient(deviceCredentials, listener); - mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE); + mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get(); // Get the default rule chain id to make it root again after test finished RuleChainId defaultRuleChainId = getDefaultRuleChainId(); @@ -294,7 +315,7 @@ public class MqttClientTest extends AbstractContainerTest { clientRequest.addProperty("method", "getResponse"); clientRequest.addProperty("params", true); Integer requestId = 42; - mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes())); + mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes())).get(); // Check the response from the server TimeUnit.SECONDS.sleep(1); diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 95e7b7571b..17f996e15a 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -17,7 +17,12 @@ spring.main.web-environment: false spring.main.web-application-type: none -# MQTT server parameters +# Clustering properties +cluster: + # Unique id for this node (autogenerated if empty) + node_id: "${CLUSTER_NODE_ID:}" + +# COAP server parameters transport: coap: bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index bb1d3c4888..31e0194d86 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -20,6 +20,11 @@ server: # Server bind port port: "${HTTP_BIND_PORT:8081}" +# Clustering properties +cluster: + # Unique id for this node (autogenerated if empty) + node_id: "${CLUSTER_NODE_ID:}" + # HTTP server parameters transport: http: diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 719530c73f..46e9eb9c41 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -17,6 +17,11 @@ spring.main.web-environment: false spring.main.web-application-type: none +# Clustering properties +cluster: + # Unique id for this node (autogenerated if empty) + node_id: "${CLUSTER_NODE_ID:}" + # MQTT server parameters transport: mqtt: