Merge pull request #9248 from thingsboard/fix/isolated-queues-partitions
Isolated queues: fix partitions recalculation
This commit is contained in:
		
						commit
						cf285e25ad
					
				@ -501,6 +501,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
 | 
				
			|||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					        partitionService.recalculatePartitions(serviceInfoProvider.getServiceInfo(), new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE)));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
 | 
					    private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
 | 
				
			||||||
 | 
				
			|||||||
@ -23,6 +23,7 @@ import org.junit.Assert;
 | 
				
			|||||||
import org.junit.Before;
 | 
					import org.junit.Before;
 | 
				
			||||||
import org.junit.Test;
 | 
					import org.junit.Test;
 | 
				
			||||||
import org.junit.runner.RunWith;
 | 
					import org.junit.runner.RunWith;
 | 
				
			||||||
 | 
					import org.mockito.Mockito;
 | 
				
			||||||
import org.mockito.junit.MockitoJUnitRunner;
 | 
					import org.mockito.junit.MockitoJUnitRunner;
 | 
				
			||||||
import org.springframework.context.ApplicationEventPublisher;
 | 
					import org.springframework.context.ApplicationEventPublisher;
 | 
				
			||||||
import org.springframework.test.util.ReflectionTestUtils;
 | 
					import org.springframework.test.util.ReflectionTestUtils;
 | 
				
			||||||
@ -35,7 +36,9 @@ import org.thingsboard.server.common.data.id.UUIDBased;
 | 
				
			|||||||
import org.thingsboard.server.common.data.queue.Queue;
 | 
					import org.thingsboard.server.common.data.queue.Queue;
 | 
				
			||||||
import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
					import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
				
			||||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
					import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
				
			||||||
 | 
					import org.thingsboard.server.gen.transport.TransportProtos;
 | 
				
			||||||
import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo;
 | 
					import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo;
 | 
				
			||||||
 | 
					import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import java.text.SimpleDateFormat;
 | 
					import java.text.SimpleDateFormat;
 | 
				
			||||||
import java.util.ArrayList;
 | 
					import java.util.ArrayList;
 | 
				
			||||||
@ -49,12 +52,17 @@ import java.util.Random;
 | 
				
			|||||||
import java.util.Set;
 | 
					import java.util.Set;
 | 
				
			||||||
import java.util.UUID;
 | 
					import java.util.UUID;
 | 
				
			||||||
import java.util.concurrent.TimeUnit;
 | 
					import java.util.concurrent.TimeUnit;
 | 
				
			||||||
 | 
					import java.util.function.Predicate;
 | 
				
			||||||
import java.util.stream.Collectors;
 | 
					import java.util.stream.Collectors;
 | 
				
			||||||
import java.util.stream.Stream;
 | 
					import java.util.stream.Stream;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
					import static org.assertj.core.api.Assertions.assertThat;
 | 
				
			||||||
 | 
					import static org.mockito.ArgumentMatchers.any;
 | 
				
			||||||
 | 
					import static org.mockito.ArgumentMatchers.argThat;
 | 
				
			||||||
import static org.mockito.ArgumentMatchers.eq;
 | 
					import static org.mockito.ArgumentMatchers.eq;
 | 
				
			||||||
import static org.mockito.Mockito.mock;
 | 
					import static org.mockito.Mockito.mock;
 | 
				
			||||||
 | 
					import static org.mockito.Mockito.never;
 | 
				
			||||||
 | 
					import static org.mockito.Mockito.verify;
 | 
				
			||||||
import static org.mockito.Mockito.when;
 | 
					import static org.mockito.Mockito.when;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@Slf4j
 | 
					@Slf4j
 | 
				
			||||||
@ -78,15 +86,7 @@ public class HashPartitionServiceTest {
 | 
				
			|||||||
        applicationEventPublisher = mock(ApplicationEventPublisher.class);
 | 
					        applicationEventPublisher = mock(ApplicationEventPublisher.class);
 | 
				
			||||||
        routingInfoService = mock(TenantRoutingInfoService.class);
 | 
					        routingInfoService = mock(TenantRoutingInfoService.class);
 | 
				
			||||||
        queueRoutingInfoService = mock(QueueRoutingInfoService.class);
 | 
					        queueRoutingInfoService = mock(QueueRoutingInfoService.class);
 | 
				
			||||||
        clusterRoutingService = new HashPartitionService(discoveryService,
 | 
					        clusterRoutingService = createPartitionService();
 | 
				
			||||||
                routingInfoService,
 | 
					 | 
				
			||||||
                applicationEventPublisher,
 | 
					 | 
				
			||||||
                queueRoutingInfoService);
 | 
					 | 
				
			||||||
        ReflectionTestUtils.setField(clusterRoutingService, "coreTopic", "tb.core");
 | 
					 | 
				
			||||||
        ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 10);
 | 
					 | 
				
			||||||
        ReflectionTestUtils.setField(clusterRoutingService, "vcTopic", "tb.vc");
 | 
					 | 
				
			||||||
        ReflectionTestUtils.setField(clusterRoutingService, "vcPartitions", 10);
 | 
					 | 
				
			||||||
        ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName);
 | 
					 | 
				
			||||||
        ServiceInfo currentServer = ServiceInfo.newBuilder()
 | 
					        ServiceInfo currentServer = ServiceInfo.newBuilder()
 | 
				
			||||||
                .setServiceId("tb-core-0")
 | 
					                .setServiceId("tb-core-0")
 | 
				
			||||||
                .addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name()))
 | 
					                .addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name()))
 | 
				
			||||||
@ -101,8 +101,6 @@ public class HashPartitionServiceTest {
 | 
				
			|||||||
                    .build());
 | 
					                    .build());
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        clusterRoutingService.init();
 | 
					 | 
				
			||||||
        clusterRoutingService.partitionsInit();
 | 
					 | 
				
			||||||
        clusterRoutingService.recalculatePartitions(currentServer, otherServers);
 | 
					        clusterRoutingService.recalculatePartitions(currentServer, otherServers);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -194,25 +192,12 @@ public class HashPartitionServiceTest {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        List<Queue> queues = new ArrayList<>();
 | 
					        List<Queue> queues = new ArrayList<>();
 | 
				
			||||||
        Queue systemQueue = new Queue();
 | 
					        queues.add(createQueue(TenantId.SYS_TENANT_ID, 10));
 | 
				
			||||||
        systemQueue.setTenantId(TenantId.SYS_TENANT_ID);
 | 
					 | 
				
			||||||
        systemQueue.setName("Main");
 | 
					 | 
				
			||||||
        systemQueue.setTopic(DataConstants.MAIN_QUEUE_TOPIC);
 | 
					 | 
				
			||||||
        systemQueue.setPartitions(10);
 | 
					 | 
				
			||||||
        systemQueue.setId(new QueueId(UUID.randomUUID()));
 | 
					 | 
				
			||||||
        queues.add(systemQueue);
 | 
					 | 
				
			||||||
        tenants.forEach((tenantId, profileId) -> {
 | 
					        tenants.forEach((tenantId, profileId) -> {
 | 
				
			||||||
            Queue isolatedQueue = new Queue();
 | 
					            queues.add(createQueue(tenantId, 2));
 | 
				
			||||||
            isolatedQueue.setTenantId(tenantId);
 | 
					            mockRoutingInfo(tenantId, profileId, true);
 | 
				
			||||||
            isolatedQueue.setName("Main");
 | 
					 | 
				
			||||||
            isolatedQueue.setTopic(DataConstants.MAIN_QUEUE_TOPIC);
 | 
					 | 
				
			||||||
            isolatedQueue.setPartitions(2);
 | 
					 | 
				
			||||||
            isolatedQueue.setId(new QueueId(UUID.randomUUID()));
 | 
					 | 
				
			||||||
            queues.add(isolatedQueue);
 | 
					 | 
				
			||||||
            when(routingInfoService.getRoutingInfo(eq(tenantId))).thenReturn(new TenantRoutingInfo(tenantId, profileId, true));
 | 
					 | 
				
			||||||
        });
 | 
					        });
 | 
				
			||||||
        when(queueRoutingInfoService.getAllQueuesRoutingInfo()).thenReturn(queues.stream()
 | 
					        mockQueues(queues);
 | 
				
			||||||
                .map(QueueRoutingInfo::new).collect(Collectors.toList()));
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        List<ServiceInfo> ruleEngines = new ArrayList<>();
 | 
					        List<ServiceInfo> ruleEngines = new ArrayList<>();
 | 
				
			||||||
        Map<TenantProfileId, List<ServiceInfo>> dedicatedServers = new HashMap<>();
 | 
					        Map<TenantProfileId, List<ServiceInfo>> dedicatedServers = new HashMap<>();
 | 
				
			||||||
@ -275,6 +260,97 @@ public class HashPartitionServiceTest {
 | 
				
			|||||||
        });
 | 
					        });
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Test
 | 
				
			||||||
 | 
					    public void testPartitionChangeEvents_isolatedProfile_oneCommonServer_oneDedicated() {
 | 
				
			||||||
 | 
					        ServiceInfo commonRuleEngine = ServiceInfo.newBuilder()
 | 
				
			||||||
 | 
					                .setServiceId("tb-rule-engine-1")
 | 
				
			||||||
 | 
					                .addAllServiceTypes(List.of(ServiceType.TB_RULE_ENGINE.name()))
 | 
				
			||||||
 | 
					                .build();
 | 
				
			||||||
 | 
					        TenantProfileId tenantProfileId = new TenantProfileId(UUID.randomUUID());
 | 
				
			||||||
 | 
					        ServiceInfo dedicatedRuleEngine = ServiceInfo.newBuilder()
 | 
				
			||||||
 | 
					                .setServiceId("tb-rule-engine-isolated-1")
 | 
				
			||||||
 | 
					                .addAllServiceTypes(List.of(ServiceType.TB_RULE_ENGINE.name()))
 | 
				
			||||||
 | 
					                .addAssignedTenantProfiles(tenantProfileId.toString())
 | 
				
			||||||
 | 
					                .build();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        List<Queue> queues = new ArrayList<>();
 | 
				
			||||||
 | 
					        Queue systemQueue = createQueue(TenantId.SYS_TENANT_ID, 10);
 | 
				
			||||||
 | 
					        queues.add(systemQueue);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        TenantId tenantId = new TenantId(UUID.randomUUID());
 | 
				
			||||||
 | 
					        mockRoutingInfo(tenantId, tenantProfileId, false); // not isolated yet
 | 
				
			||||||
 | 
					        mockQueues(queues);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        when(discoveryService.isService(eq(ServiceType.TB_RULE_ENGINE))).thenReturn(true);
 | 
				
			||||||
 | 
					        Mockito.reset(applicationEventPublisher);
 | 
				
			||||||
 | 
					        HashPartitionService partitionService_common = createPartitionService();
 | 
				
			||||||
 | 
					        partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine));
 | 
				
			||||||
 | 
					        verifyPartitionChangeEvent(event -> {
 | 
				
			||||||
 | 
					            return event.getQueueKey().getTenantId().isSysTenantId() &&
 | 
				
			||||||
 | 
					                    event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) &&
 | 
				
			||||||
 | 
					                    event.getPartitions().stream().map(TopicPartitionInfo::getPartition).collect(Collectors.toSet())
 | 
				
			||||||
 | 
					                            .size() == systemQueue.getPartitions();
 | 
				
			||||||
 | 
					        });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Mockito.reset(applicationEventPublisher);
 | 
				
			||||||
 | 
					        HashPartitionService partitionService_dedicated = createPartitionService();
 | 
				
			||||||
 | 
					        partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine));
 | 
				
			||||||
 | 
					        verify(applicationEventPublisher, never()).publishEvent(any(PartitionChangeEvent.class));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Queue isolatedQueue = createQueue(tenantId, 3);
 | 
				
			||||||
 | 
					        queues.add(isolatedQueue);
 | 
				
			||||||
 | 
					        mockQueues(queues);
 | 
				
			||||||
 | 
					        mockRoutingInfo(tenantId, tenantProfileId, true); // making isolated
 | 
				
			||||||
 | 
					        TransportProtos.QueueUpdateMsg queueUpdateMsg = TransportProtos.QueueUpdateMsg.newBuilder()
 | 
				
			||||||
 | 
					                .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
 | 
				
			||||||
 | 
					                .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
 | 
				
			||||||
 | 
					                .setQueueIdMSB(isolatedQueue.getUuidId().getMostSignificantBits())
 | 
				
			||||||
 | 
					                .setQueueIdLSB(isolatedQueue.getUuidId().getLeastSignificantBits())
 | 
				
			||||||
 | 
					                .setQueueName(isolatedQueue.getName())
 | 
				
			||||||
 | 
					                .setQueueTopic(isolatedQueue.getTopic())
 | 
				
			||||||
 | 
					                .setPartitions(isolatedQueue.getPartitions())
 | 
				
			||||||
 | 
					                .build();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        partitionService_common.updateQueue(queueUpdateMsg);
 | 
				
			||||||
 | 
					        partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine));
 | 
				
			||||||
 | 
					        // expecting event about no partitions for isolated queue key
 | 
				
			||||||
 | 
					        verifyPartitionChangeEvent(event -> {
 | 
				
			||||||
 | 
					            return event.getQueueKey().getTenantId().equals(tenantId) &&
 | 
				
			||||||
 | 
					                    event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) &&
 | 
				
			||||||
 | 
					                    event.getPartitions().isEmpty();
 | 
				
			||||||
 | 
					        });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        partitionService_dedicated.updateQueue(queueUpdateMsg);
 | 
				
			||||||
 | 
					        partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine));
 | 
				
			||||||
 | 
					        verifyPartitionChangeEvent(event -> {
 | 
				
			||||||
 | 
					            return event.getQueueKey().getTenantId().equals(tenantId) &&
 | 
				
			||||||
 | 
					                    event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) &&
 | 
				
			||||||
 | 
					                    event.getPartitions().stream().map(TopicPartitionInfo::getPartition).collect(Collectors.toSet())
 | 
				
			||||||
 | 
					                            .size() == isolatedQueue.getPartitions();
 | 
				
			||||||
 | 
					        });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        queues = List.of(systemQueue);
 | 
				
			||||||
 | 
					        mockQueues(queues);
 | 
				
			||||||
 | 
					        mockRoutingInfo(tenantId, tenantProfileId, false); // turning off isolation
 | 
				
			||||||
 | 
					        Mockito.reset(applicationEventPublisher);
 | 
				
			||||||
 | 
					        TransportProtos.QueueDeleteMsg queueDeleteMsg = TransportProtos.QueueDeleteMsg.newBuilder()
 | 
				
			||||||
 | 
					                .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
 | 
				
			||||||
 | 
					                .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
 | 
				
			||||||
 | 
					                .setQueueIdMSB(isolatedQueue.getUuidId().getMostSignificantBits())
 | 
				
			||||||
 | 
					                .setQueueIdLSB(isolatedQueue.getUuidId().getLeastSignificantBits())
 | 
				
			||||||
 | 
					                .setQueueName(isolatedQueue.getName())
 | 
				
			||||||
 | 
					                .build();
 | 
				
			||||||
 | 
					        partitionService_dedicated.removeQueue(queueDeleteMsg);
 | 
				
			||||||
 | 
					        partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine));
 | 
				
			||||||
 | 
					        verifyPartitionChangeEvent(event -> {
 | 
				
			||||||
 | 
					            return event.getQueueKey().getTenantId().equals(tenantId) &&
 | 
				
			||||||
 | 
					                    event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) &&
 | 
				
			||||||
 | 
					                    event.getPartitions().isEmpty();
 | 
				
			||||||
 | 
					        });
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Test
 | 
					    @Test
 | 
				
			||||||
    public void testIsManagedByCurrentServiceCheck() {
 | 
					    public void testIsManagedByCurrentServiceCheck() {
 | 
				
			||||||
        TenantProfileId isolatedProfileId = new TenantProfileId(UUID.randomUUID());
 | 
					        TenantProfileId isolatedProfileId = new TenantProfileId(UUID.randomUUID());
 | 
				
			||||||
@ -282,9 +358,9 @@ public class HashPartitionServiceTest {
 | 
				
			|||||||
        TenantProfileId regularProfileId = new TenantProfileId(UUID.randomUUID());
 | 
					        TenantProfileId regularProfileId = new TenantProfileId(UUID.randomUUID());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        TenantId isolatedTenantId = new TenantId(UUID.randomUUID());
 | 
					        TenantId isolatedTenantId = new TenantId(UUID.randomUUID());
 | 
				
			||||||
        when(routingInfoService.getRoutingInfo(eq(isolatedTenantId))).thenReturn(new TenantRoutingInfo(isolatedTenantId, isolatedProfileId, true));
 | 
					        mockRoutingInfo(isolatedTenantId, isolatedProfileId, true);
 | 
				
			||||||
        TenantId regularTenantId = new TenantId(UUID.randomUUID());
 | 
					        TenantId regularTenantId = new TenantId(UUID.randomUUID());
 | 
				
			||||||
        when(routingInfoService.getRoutingInfo(eq(regularTenantId))).thenReturn(new TenantRoutingInfo(regularTenantId, regularProfileId, false));
 | 
					        mockRoutingInfo(regularTenantId, regularProfileId, false);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        assertThat(clusterRoutingService.isManagedByCurrentService(isolatedTenantId)).isTrue();
 | 
					        assertThat(clusterRoutingService.isManagedByCurrentService(isolatedTenantId)).isTrue();
 | 
				
			||||||
        assertThat(clusterRoutingService.isManagedByCurrentService(regularTenantId)).isFalse();
 | 
					        assertThat(clusterRoutingService.isManagedByCurrentService(regularTenantId)).isFalse();
 | 
				
			||||||
@ -296,4 +372,43 @@ public class HashPartitionServiceTest {
 | 
				
			|||||||
        assertThat(clusterRoutingService.isManagedByCurrentService(regularTenantId)).isTrue();
 | 
					        assertThat(clusterRoutingService.isManagedByCurrentService(regularTenantId)).isTrue();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private void verifyPartitionChangeEvent(Predicate<PartitionChangeEvent> predicate) {
 | 
				
			||||||
 | 
					        verify(applicationEventPublisher).publishEvent(argThat(event -> event instanceof PartitionChangeEvent && predicate.test((PartitionChangeEvent) event)));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private void mockRoutingInfo(TenantId tenantId, TenantProfileId tenantProfileId, boolean isolatedTbRuleEngine) {
 | 
				
			||||||
 | 
					        when(routingInfoService.getRoutingInfo(eq(tenantId)))
 | 
				
			||||||
 | 
					                .thenReturn(new TenantRoutingInfo(tenantId, tenantProfileId, isolatedTbRuleEngine));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private void mockQueues(List<Queue> queues) {
 | 
				
			||||||
 | 
					        when(queueRoutingInfoService.getAllQueuesRoutingInfo()).thenReturn(queues.stream()
 | 
				
			||||||
 | 
					                .map(QueueRoutingInfo::new).collect(Collectors.toList()));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private Queue createQueue(TenantId tenantId, int partitions) {
 | 
				
			||||||
 | 
					        Queue systemQueue = new Queue();
 | 
				
			||||||
 | 
					        systemQueue.setTenantId(tenantId);
 | 
				
			||||||
 | 
					        systemQueue.setName("Main");
 | 
				
			||||||
 | 
					        systemQueue.setTopic(DataConstants.MAIN_QUEUE_TOPIC);
 | 
				
			||||||
 | 
					        systemQueue.setPartitions(partitions);
 | 
				
			||||||
 | 
					        systemQueue.setId(new QueueId(UUID.randomUUID()));
 | 
				
			||||||
 | 
					        return systemQueue;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private HashPartitionService createPartitionService() {
 | 
				
			||||||
 | 
					        HashPartitionService partitionService = new HashPartitionService(discoveryService,
 | 
				
			||||||
 | 
					                routingInfoService,
 | 
				
			||||||
 | 
					                applicationEventPublisher,
 | 
				
			||||||
 | 
					                queueRoutingInfoService);
 | 
				
			||||||
 | 
					        ReflectionTestUtils.setField(partitionService, "coreTopic", "tb.core");
 | 
				
			||||||
 | 
					        ReflectionTestUtils.setField(partitionService, "corePartitions", 10);
 | 
				
			||||||
 | 
					        ReflectionTestUtils.setField(partitionService, "vcTopic", "tb.vc");
 | 
				
			||||||
 | 
					        ReflectionTestUtils.setField(partitionService, "vcPartitions", 10);
 | 
				
			||||||
 | 
					        ReflectionTestUtils.setField(partitionService, "hashFunctionName", hashFunctionName);
 | 
				
			||||||
 | 
					        partitionService.init();
 | 
				
			||||||
 | 
					        partitionService.partitionsInit();
 | 
				
			||||||
 | 
					        return partitionService;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -179,7 +179,6 @@ public class HashPartitionService implements PartitionService {
 | 
				
			|||||||
    public void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) {
 | 
					    public void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) {
 | 
				
			||||||
        TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
 | 
					        TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
 | 
				
			||||||
        QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
 | 
					        QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
 | 
				
			||||||
        myPartitions.remove(queueKey);
 | 
					 | 
				
			||||||
        partitionTopicsMap.remove(queueKey);
 | 
					        partitionTopicsMap.remove(queueKey);
 | 
				
			||||||
        partitionSizesMap.remove(queueKey);
 | 
					        partitionSizesMap.remove(queueKey);
 | 
				
			||||||
        //TODO: remove after merging tb entity services
 | 
					        //TODO: remove after merging tb entity services
 | 
				
			||||||
@ -272,11 +271,22 @@ public class HashPartitionService implements PartitionService {
 | 
				
			|||||||
        final ConcurrentMap<QueueKey, List<Integer>> oldPartitions = myPartitions;
 | 
					        final ConcurrentMap<QueueKey, List<Integer>> oldPartitions = myPartitions;
 | 
				
			||||||
        myPartitions = newPartitions;
 | 
					        myPartitions = newPartitions;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Set<QueueKey> removed = new HashSet<>();
 | 
				
			||||||
        oldPartitions.forEach((queueKey, partitions) -> {
 | 
					        oldPartitions.forEach((queueKey, partitions) -> {
 | 
				
			||||||
            if (!myPartitions.containsKey(queueKey)) {
 | 
					            if (!newPartitions.containsKey(queueKey)) {
 | 
				
			||||||
 | 
					                removed.add(queueKey);
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        });
 | 
				
			||||||
 | 
					        if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) {
 | 
				
			||||||
 | 
					            partitionSizesMap.keySet().stream()
 | 
				
			||||||
 | 
					                    .filter(queueKey -> queueKey.getType() == ServiceType.TB_RULE_ENGINE &&
 | 
				
			||||||
 | 
					                            !queueKey.getTenantId().isSysTenantId() &&
 | 
				
			||||||
 | 
					                            !newPartitions.containsKey(queueKey))
 | 
				
			||||||
 | 
					                    .forEach(removed::add);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        removed.forEach(queueKey -> {
 | 
				
			||||||
            log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", queueKey);
 | 
					            log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", queueKey);
 | 
				
			||||||
            applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, queueKey, Collections.emptySet()));
 | 
					            applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, queueKey, Collections.emptySet()));
 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
        });
 | 
					        });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        myPartitions.forEach((queueKey, partitions) -> {
 | 
					        myPartitions.forEach((queueKey, partitions) -> {
 | 
				
			||||||
@ -306,7 +316,11 @@ public class HashPartitionService implements PartitionService {
 | 
				
			|||||||
            if (!changes.isEmpty()) {
 | 
					            if (!changes.isEmpty()) {
 | 
				
			||||||
                applicationEventPublisher.publishEvent(new ClusterTopologyChangeEvent(this, changes));
 | 
					                applicationEventPublisher.publishEvent(new ClusterTopologyChangeEvent(this, changes));
 | 
				
			||||||
                responsibleServices.forEach((profileId, serviceInfos) -> {
 | 
					                responsibleServices.forEach((profileId, serviceInfos) -> {
 | 
				
			||||||
 | 
					                    if (profileId != null) {
 | 
				
			||||||
                        log.info("Servers responsible for tenant profile {}: {}", profileId, toServiceIds(serviceInfos));
 | 
					                        log.info("Servers responsible for tenant profile {}: {}", profileId, toServiceIds(serviceInfos));
 | 
				
			||||||
 | 
					                    } else {
 | 
				
			||||||
 | 
					                        log.info("Servers responsible for system queues: {}", toServiceIds(serviceInfos));
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
                });
 | 
					                });
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user