diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index 77a932e10d..8acca0b4c7 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -292,28 +292,38 @@ public class DefaultDeviceStateService implements DeviceStateService { } } + volatile Set pendingPartitions; + @Override public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) { if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) { synchronized (this) { + pendingPartitions = partitionChangeEvent.getPartitions(); if (!clusterUpdatePending) { clusterUpdatePending = true; queueExecutor.submit(() -> { clusterUpdatePending = false; - initStateFromDB(partitionChangeEvent.getPartitions()); + initStateFromDB(); }); } } } } - private void initStateFromDB(Set partitions) { + private void initStateFromDB() { try { - Set addedPartitions = new HashSet<>(partitions); + log.info("CURRENT PARTITIONS: {}", partitionedDevices.keySet()); + log.info("NEW PARTITIONS: {}", pendingPartitions); + + Set addedPartitions = new HashSet<>(pendingPartitions); addedPartitions.removeAll(partitionedDevices.keySet()); + log.info("ADDED PARTITIONS: {}", addedPartitions); + Set removedPartitions = new HashSet<>(partitionedDevices.keySet()); - removedPartitions.removeAll(partitions); + removedPartitions.removeAll(pendingPartitions); + + log.info("REMOVED PARTITIONS: {}", removedPartitions); // We no longer manage current partition of devices; removedPartitions.forEach(partition -> { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 608ecdefc9..ba9f9356d3 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -668,7 +668,7 @@ queue: topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" partitions: "${TB_QUEUE_CORE_PARTITIONS:10}" - pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:60000}" + pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:2000}" stats: enabled: "${TB_QUEUE_CORE_STATS_ENABLED:true}" print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:60000}" @@ -690,7 +690,7 @@ queue: rule-engine: topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" - pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}" + pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:2000}" stats: enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:true}" print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:60000}" @@ -699,7 +699,7 @@ queue: topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb_rule_engine.main}" poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}" partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}" - pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:5000}" + pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:2000}" submit-strategy: type: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL # For BATCH only @@ -714,7 +714,7 @@ queue: topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}" poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}" partitions: "${TB_QUEUE_RE_HP_PARTITIONS:10}" - pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:60000}" + pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:2000}" submit-strategy: type: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL # For BATCH only @@ -729,7 +729,7 @@ queue: topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}" poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}" partitions: "${TB_QUEUE_RE_SQ_PARTITIONS:10}" - pack-processing-timeout: "${TB_QUEUE_RE_SQ_PACK_PROCESSING_TIMEOUT_MS:60000}" + pack-processing-timeout: "${TB_QUEUE_RE_SQ_PACK_PROCESSING_TIMEOUT_MS:2000}" submit-strategy: type: "${TB_QUEUE_RE_SQ_SUBMIT_STRATEGY_TYPE:SEQUENTIAL_BY_ORIGINATOR}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL # For BATCH only diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java index 734c79d927..9dc5a19ecd 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusConsumerTemplate.java @@ -101,7 +101,7 @@ public class TbServiceBusConsumerTemplate extends Abstract @Override protected void doSubscribe(List topicNames) { createReceivers(); - messagesPerQueue = receivers.size() / partitions.size(); + messagesPerQueue = receivers.size() / Math.max(partitions.size(), 1); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index f3f11fa7a1..c683ed846d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -36,6 +36,7 @@ import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import javax.annotation.PostConstruct; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -148,6 +149,14 @@ public class HashPartitionService implements PartitionService { } } }); + + oldPartitions.forEach((serviceQueueKey, partitions) -> { + if (!myPartitions.containsKey(serviceQueueKey)) { + log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", serviceQueueKey); + applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceQueueKey, Collections.emptySet())); + } + }); + myPartitions.forEach((serviceQueueKey, partitions) -> { if (!partitions.equals(oldPartitions.get(serviceQueueKey))) { log.info("[{}] NEW PARTITIONS: {}", serviceQueueKey, partitions); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index de94db804d..1fbfb68e3e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -71,8 +71,12 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue @Override protected void doSubscribe(List topicNames) { - topicNames.forEach(admin::createTopicIfNotExists); - consumer.subscribe(topicNames); + if (!topicNames.isEmpty()) { + topicNames.forEach(admin::createTopicIfNotExists); + consumer.subscribe(topicNames); + } else { + consumer.unsubscribe(); + } } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java index b5b6126cd5..f02cc27ee4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java @@ -106,7 +106,7 @@ public class TbPubSubConsumerTemplate extends AbstractPara subscriptionNames = new LinkedHashSet<>(topicNames); subscriptionNames.forEach(admin::createTopicIfNotExists); initNewExecutor(subscriptionNames.size() + 1); - messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size(); + messagesPerTopic = pubSubSettings.getMaxMessages() / Math.max(subscriptionNames.size(), 1); } @Override diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 351be0865f..ac19e72037 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -294,7 +294,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement break; } } catch (Exception e) { - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); + log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e); grantedQoSList.add(FAILURE.value()); } }