Single PartitionChangeEvent after recalculatePartitions
This commit is contained in:
parent
cf285e25ad
commit
0d8a2549d9
@ -121,7 +121,7 @@ public class DefaultActorService extends TbApplicationEventListener<PartitionCha
|
|||||||
@Override
|
@Override
|
||||||
protected void onTbApplicationEvent(PartitionChangeEvent event) {
|
protected void onTbApplicationEvent(PartitionChangeEvent event) {
|
||||||
log.info("Received partition change event.");
|
log.info("Received partition change event.");
|
||||||
this.appActor.tellWithHighPriority(new PartitionChangeMsg(event.getQueueKey().getType(), event.getPartitions()));
|
this.appActor.tellWithHighPriority(new PartitionChangeMsg(event.getServiceType()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@PreDestroy
|
@PreDestroy
|
||||||
|
|||||||
@ -183,19 +183,21 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
|||||||
@Override
|
@Override
|
||||||
protected void onTbApplicationEvent(PartitionChangeEvent event) {
|
protected void onTbApplicationEvent(PartitionChangeEvent event) {
|
||||||
if (event.getServiceType().equals(getServiceType())) {
|
if (event.getServiceType().equals(getServiceType())) {
|
||||||
String serviceQueue = event.getQueueKey().getQueueName();
|
event.getPartitionsMap().forEach((queueKey, partitions) -> {
|
||||||
log.info("[{}] Subscribing to partitions: {}", serviceQueue, event.getPartitions());
|
String serviceQueue = queueKey.getQueueName();
|
||||||
Queue configuration = consumerConfigurations.get(event.getQueueKey());
|
log.info("[{}] Subscribing to partitions: {}", serviceQueue, partitions);
|
||||||
|
Queue configuration = consumerConfigurations.get(queueKey);
|
||||||
if (configuration == null) {
|
if (configuration == null) {
|
||||||
log.warn("Received invalid partition change event for {} that is not managed by this service", event.getQueueKey());
|
log.warn("Received invalid partition change event for {} that is not managed by this service", queueKey);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (!configuration.isConsumerPerPartition()) {
|
if (!configuration.isConsumerPerPartition()) {
|
||||||
consumers.get(event.getQueueKey()).subscribe(event.getPartitions());
|
consumers.get(queueKey).subscribe(partitions);
|
||||||
} else {
|
} else {
|
||||||
log.info("[{}] Subscribing consumer per partition: {}", serviceQueue, event.getPartitions());
|
log.info("[{}] Subscribing consumer per partition: {}", serviceQueue, partitions);
|
||||||
subscribeConsumerPerPartition(event.getQueueKey(), event.getPartitions());
|
subscribeConsumerPerPartition(queueKey, partitions);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -286,10 +286,8 @@ public class HashPartitionServiceTest {
|
|||||||
HashPartitionService partitionService_common = createPartitionService();
|
HashPartitionService partitionService_common = createPartitionService();
|
||||||
partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine));
|
partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine));
|
||||||
verifyPartitionChangeEvent(event -> {
|
verifyPartitionChangeEvent(event -> {
|
||||||
return event.getQueueKey().getTenantId().isSysTenantId() &&
|
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, TenantId.SYS_TENANT_ID);
|
||||||
event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) &&
|
return event.getPartitionsMap().get(queueKey).size() == systemQueue.getPartitions();
|
||||||
event.getPartitions().stream().map(TopicPartitionInfo::getPartition).collect(Collectors.toSet())
|
|
||||||
.size() == systemQueue.getPartitions();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
Mockito.reset(applicationEventPublisher);
|
Mockito.reset(applicationEventPublisher);
|
||||||
@ -316,18 +314,15 @@ public class HashPartitionServiceTest {
|
|||||||
partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine));
|
partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine));
|
||||||
// expecting event about no partitions for isolated queue key
|
// expecting event about no partitions for isolated queue key
|
||||||
verifyPartitionChangeEvent(event -> {
|
verifyPartitionChangeEvent(event -> {
|
||||||
return event.getQueueKey().getTenantId().equals(tenantId) &&
|
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId);
|
||||||
event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) &&
|
return event.getPartitionsMap().get(queueKey).isEmpty();
|
||||||
event.getPartitions().isEmpty();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
partitionService_dedicated.updateQueue(queueUpdateMsg);
|
partitionService_dedicated.updateQueue(queueUpdateMsg);
|
||||||
partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine));
|
partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine));
|
||||||
verifyPartitionChangeEvent(event -> {
|
verifyPartitionChangeEvent(event -> {
|
||||||
return event.getQueueKey().getTenantId().equals(tenantId) &&
|
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId);
|
||||||
event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) &&
|
return event.getPartitionsMap().get(queueKey).size() == isolatedQueue.getPartitions();
|
||||||
event.getPartitions().stream().map(TopicPartitionInfo::getPartition).collect(Collectors.toSet())
|
|
||||||
.size() == isolatedQueue.getPartitions();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
@ -345,9 +340,8 @@ public class HashPartitionServiceTest {
|
|||||||
partitionService_dedicated.removeQueue(queueDeleteMsg);
|
partitionService_dedicated.removeQueue(queueDeleteMsg);
|
||||||
partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine));
|
partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine));
|
||||||
verifyPartitionChangeEvent(event -> {
|
verifyPartitionChangeEvent(event -> {
|
||||||
return event.getQueueKey().getTenantId().equals(tenantId) &&
|
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId);
|
||||||
event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) &&
|
return event.getPartitionsMap().get(queueKey).isEmpty();
|
||||||
event.getPartitions().isEmpty();
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -160,7 +160,9 @@ public class DefaultDeviceStateServiceTest {
|
|||||||
Mockito.reset(service, telemetrySubscriptionService);
|
Mockito.reset(service, telemetrySubscriptionService);
|
||||||
ReflectionTestUtils.setField(service, "defaultInactivityTimeoutMs", timeout);
|
ReflectionTestUtils.setField(service, "defaultInactivityTimeoutMs", timeout);
|
||||||
service.init();
|
service.init();
|
||||||
PartitionChangeEvent event = new PartitionChangeEvent(this, new QueueKey(ServiceType.TB_CORE), Collections.singleton(tpi));
|
PartitionChangeEvent event = new PartitionChangeEvent(this, ServiceType.TB_CORE, Map.of(
|
||||||
|
new QueueKey(ServiceType.TB_CORE), Collections.singleton(tpi)
|
||||||
|
));
|
||||||
service.onApplicationEvent(event);
|
service.onApplicationEvent(event);
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -20,8 +20,6 @@ import lombok.Getter;
|
|||||||
import org.thingsboard.server.common.msg.MsgType;
|
import org.thingsboard.server.common.msg.MsgType;
|
||||||
import org.thingsboard.server.common.msg.TbActorMsg;
|
import org.thingsboard.server.common.msg.TbActorMsg;
|
||||||
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Andrew Shvayka
|
* @author Andrew Shvayka
|
||||||
*/
|
*/
|
||||||
@ -30,8 +28,6 @@ public final class PartitionChangeMsg implements TbActorMsg {
|
|||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
private final ServiceType serviceType;
|
private final ServiceType serviceType;
|
||||||
@Getter
|
|
||||||
private final Set<TopicPartitionInfo> partitions;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MsgType getMsgType() {
|
public MsgType getMsgType() {
|
||||||
|
|||||||
@ -271,6 +271,8 @@ public class HashPartitionService implements PartitionService {
|
|||||||
final ConcurrentMap<QueueKey, List<Integer>> oldPartitions = myPartitions;
|
final ConcurrentMap<QueueKey, List<Integer>> oldPartitions = myPartitions;
|
||||||
myPartitions = newPartitions;
|
myPartitions = newPartitions;
|
||||||
|
|
||||||
|
Map<QueueKey, Set<TopicPartitionInfo>> changedPartitionsMap = new HashMap<>();
|
||||||
|
|
||||||
Set<QueueKey> removed = new HashSet<>();
|
Set<QueueKey> removed = new HashSet<>();
|
||||||
oldPartitions.forEach((queueKey, partitions) -> {
|
oldPartitions.forEach((queueKey, partitions) -> {
|
||||||
if (!newPartitions.containsKey(queueKey)) {
|
if (!newPartitions.containsKey(queueKey)) {
|
||||||
@ -286,7 +288,7 @@ public class HashPartitionService implements PartitionService {
|
|||||||
}
|
}
|
||||||
removed.forEach(queueKey -> {
|
removed.forEach(queueKey -> {
|
||||||
log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", queueKey);
|
log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", queueKey);
|
||||||
applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, queueKey, Collections.emptySet()));
|
changedPartitionsMap.put(queueKey, Collections.emptySet());
|
||||||
});
|
});
|
||||||
|
|
||||||
myPartitions.forEach((queueKey, partitions) -> {
|
myPartitions.forEach((queueKey, partitions) -> {
|
||||||
@ -295,9 +297,19 @@ public class HashPartitionService implements PartitionService {
|
|||||||
Set<TopicPartitionInfo> tpiList = partitions.stream()
|
Set<TopicPartitionInfo> tpiList = partitions.stream()
|
||||||
.map(partition -> buildTopicPartitionInfo(queueKey, partition))
|
.map(partition -> buildTopicPartitionInfo(queueKey, partition))
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, queueKey, tpiList));
|
changedPartitionsMap.put(queueKey, tpiList);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
if (!changedPartitionsMap.isEmpty()) {
|
||||||
|
Map<ServiceType, Map<QueueKey, Set<TopicPartitionInfo>>> partitionsByServiceType = new HashMap<>();
|
||||||
|
changedPartitionsMap.forEach((queueKey, partitions) -> {
|
||||||
|
partitionsByServiceType.computeIfAbsent(queueKey.getType(), serviceType -> new HashMap<>())
|
||||||
|
.put(queueKey, partitions);
|
||||||
|
});
|
||||||
|
partitionsByServiceType.forEach((serviceType, partitionsMap) -> {
|
||||||
|
applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceType, partitionsMap));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
if (currentOtherServices == null) {
|
if (currentOtherServices == null) {
|
||||||
currentOtherServices = new ArrayList<>(otherServices);
|
currentOtherServices = new ArrayList<>(otherServices);
|
||||||
|
|||||||
@ -21,6 +21,8 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
|
|||||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||||
import org.thingsboard.server.queue.discovery.QueueKey;
|
import org.thingsboard.server.queue.discovery.QueueKey;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
@ToString(callSuper = true)
|
@ToString(callSuper = true)
|
||||||
@ -29,17 +31,19 @@ public class PartitionChangeEvent extends TbApplicationEvent {
|
|||||||
private static final long serialVersionUID = -8731788167026510559L;
|
private static final long serialVersionUID = -8731788167026510559L;
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
private final QueueKey queueKey;
|
private final ServiceType serviceType;
|
||||||
@Getter
|
@Getter
|
||||||
private final Set<TopicPartitionInfo> partitions;
|
private final Map<QueueKey, Set<TopicPartitionInfo>> partitionsMap;
|
||||||
|
|
||||||
public PartitionChangeEvent(Object source, QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
|
public PartitionChangeEvent(Object source, ServiceType serviceType, Map<QueueKey, Set<TopicPartitionInfo>> partitionsMap) {
|
||||||
super(source);
|
super(source);
|
||||||
this.queueKey = queueKey;
|
this.serviceType = serviceType;
|
||||||
this.partitions = partitions;
|
this.partitionsMap = partitionsMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ServiceType getServiceType() {
|
// only for service types that have single QueueKey
|
||||||
return queueKey.getType();
|
public Set<TopicPartitionInfo> getPartitions() {
|
||||||
|
return partitionsMap.values().stream().findAny().orElse(Collections.emptySet());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user