resolving partition idx by hash function
This commit is contained in:
		
							parent
							
								
									e97161c107
								
							
						
					
					
						commit
						0e9eee4f66
					
				@ -13,7 +13,7 @@
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.cluster.routing;
 | 
			
		||||
package org.thingsboard.server.queue.discovery;
 | 
			
		||||
 | 
			
		||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
@ -29,10 +29,6 @@ import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.HashPartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.QueueRoutingInfoService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
@ -111,13 +107,45 @@ public class HashPartitionServiceTest {
 | 
			
		||||
            map.put(partition, map.getOrDefault(partition, 0) + 1);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        List<Map.Entry<Integer, Integer>> data = map.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getValue)).collect(Collectors.toList());
 | 
			
		||||
        printDispersion(start, map);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testDispersionOnResolveByPartitionIdx() {
 | 
			
		||||
        int serverCount = 10;
 | 
			
		||||
        int queueCount = 1000;
 | 
			
		||||
        int partitionCount = 3;
 | 
			
		||||
 | 
			
		||||
        List<TransportProtos.ServiceInfo> services = new ArrayList<>();
 | 
			
		||||
 | 
			
		||||
        for (int i = 0; i < serverCount; i++) {
 | 
			
		||||
            services.add(TransportProtos.ServiceInfo.newBuilder().setServiceId("RE-" + i).build());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        long start = System.currentTimeMillis();
 | 
			
		||||
        Map<String, Integer> map = new HashMap<>();
 | 
			
		||||
        services.forEach(s -> map.put(s.getServiceId(), 0));
 | 
			
		||||
 | 
			
		||||
        for (int queueIndex = 0; queueIndex < queueCount; queueIndex++) {
 | 
			
		||||
            for (int partition = 0; partition < partitionCount; partition++) {
 | 
			
		||||
                TopicPartitionInfo tpi = new TopicPartitionInfo("tb_rule_engine.queue_" + queueIndex, TenantId.SYS_TENANT_ID, partition, false);
 | 
			
		||||
                TransportProtos.ServiceInfo serviceInfo = clusterRoutingService.resolveByPartitionIdx(services, tpi);
 | 
			
		||||
                String serviceId = serviceInfo.getServiceId();
 | 
			
		||||
                map.put(serviceId, map.get(serviceId) + 1);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        printDispersion(start, map);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private <T> void printDispersion(long start, Map<T, Integer> map) {
 | 
			
		||||
        List<Map.Entry<T, Integer>> data = map.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getValue)).collect(Collectors.toList());
 | 
			
		||||
        long end = System.currentTimeMillis();
 | 
			
		||||
        double diff = (data.get(data.size() - 1).getValue() - data.get(0).getValue());
 | 
			
		||||
        double diffPercent = (diff / ITERATIONS) * 100.0;
 | 
			
		||||
        System.out.println("Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", diffPercent) + "%)");
 | 
			
		||||
        Assert.assertTrue(diffPercent < 0.5);
 | 
			
		||||
        for (Map.Entry<Integer, Integer> entry : data) {
 | 
			
		||||
        for (Map.Entry<T, Integer> entry : data) {
 | 
			
		||||
            System.out.println(entry.getKey() + ": " + entry.getValue());
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -253,8 +253,12 @@ public class HashPartitionService implements PartitionService {
 | 
			
		||||
        ConcurrentMap<QueueKey, List<Integer>> oldPartitions = myPartitions;
 | 
			
		||||
        myPartitions = new ConcurrentHashMap<>();
 | 
			
		||||
        partitionSizesMap.forEach((queueKey, size) -> {
 | 
			
		||||
            TenantId tenantId = queueKey.getTenantId();
 | 
			
		||||
            String topic = partitionTopicsMap.get(queueKey);
 | 
			
		||||
 | 
			
		||||
            for (int i = 0; i < size; i++) {
 | 
			
		||||
                ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), i);
 | 
			
		||||
                TopicPartitionInfo tpi = new TopicPartitionInfo(topic, tenantId, i, false);
 | 
			
		||||
                ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), tpi);
 | 
			
		||||
                if (currentService.equals(serviceInfo)) {
 | 
			
		||||
                    myPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i);
 | 
			
		||||
                }
 | 
			
		||||
@ -434,11 +438,14 @@ public class HashPartitionService implements PartitionService {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ServiceInfo resolveByPartitionIdx(List<ServiceInfo> servers, Integer partitionIdx) {
 | 
			
		||||
    protected ServiceInfo resolveByPartitionIdx(List<ServiceInfo> servers, TopicPartitionInfo tpi) {
 | 
			
		||||
        if (servers == null || servers.isEmpty()) {
 | 
			
		||||
            return null;
 | 
			
		||||
        }
 | 
			
		||||
        return servers.get(partitionIdx % servers.size());
 | 
			
		||||
 | 
			
		||||
        int hash = hashFunction.newHasher().putInt(tpi.hashCode()).hash().asInt();
 | 
			
		||||
 | 
			
		||||
        return servers.get(Math.abs(hash % servers.size()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static HashFunction forName(String name) {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user