Merge branch 'develop/3.6' of github.com:thingsboard/thingsboard into develop/3.6
This commit is contained in:
		
						commit
						4b99a3a00b
					
				@ -22,7 +22,7 @@ ThingsBoard documentation is hosted on [thingsboard.io](https://thingsboard.io/d
 | 
			
		||||
[](https://thingsboard.io/smart-farming/)
 | 
			
		||||
 | 
			
		||||
[**IoT Rule Engine**](https://thingsboard.io/docs/user-guide/rule-engine-2-0/re-getting-started/)
 | 
			
		||||
[](https://thingsboard.io/docs/user-guide/rule-engine-2-0/re-getting-started/)
 | 
			
		||||
[](https://thingsboard.io/docs/user-guide/rule-engine-2-0/re-getting-started/)
 | 
			
		||||
 | 
			
		||||
[**Smart metering**](https://thingsboard.io/smart-metering/)
 | 
			
		||||
[](https://thingsboard.io/smart-metering/)
 | 
			
		||||
 | 
			
		||||
@ -199,6 +199,7 @@ export HTTP_BIND_PORT=18080
 | 
			
		||||
export MQTT_BIND_PORT=11883
 | 
			
		||||
export COAP_BIND_PORT=15683
 | 
			
		||||
export LWM2M_ENABLED=false
 | 
			
		||||
export SNMP_ENABLED=false
 | 
			
		||||
EOL'
 | 
			
		||||
{:copy-code}
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
@ -131,6 +131,7 @@ export HTTP_BIND_PORT=18080
 | 
			
		||||
export MQTT_BIND_PORT=11883
 | 
			
		||||
export COAP_BIND_PORT=15683
 | 
			
		||||
export LWM2M_ENABLED=false
 | 
			
		||||
export SNMP_ENABLED=false
 | 
			
		||||
EOL'
 | 
			
		||||
{:copy-code}
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
@ -121,7 +121,7 @@ public class DefaultActorService extends TbApplicationEventListener<PartitionCha
 | 
			
		||||
    @Override
 | 
			
		||||
    protected void onTbApplicationEvent(PartitionChangeEvent event) {
 | 
			
		||||
        log.info("Received partition change event.");
 | 
			
		||||
        this.appActor.tellWithHighPriority(new PartitionChangeMsg(event.getQueueKey().getType(), event.getPartitions()));
 | 
			
		||||
        this.appActor.tellWithHighPriority(new PartitionChangeMsg(event.getServiceType()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreDestroy
 | 
			
		||||
 | 
			
		||||
@ -183,19 +183,20 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
 | 
			
		||||
    @Override
 | 
			
		||||
    protected void onTbApplicationEvent(PartitionChangeEvent event) {
 | 
			
		||||
        if (event.getServiceType().equals(getServiceType())) {
 | 
			
		||||
            String serviceQueue = event.getQueueKey().getQueueName();
 | 
			
		||||
            log.info("[{}] Subscribing to partitions: {}", serviceQueue, event.getPartitions());
 | 
			
		||||
            Queue configuration = consumerConfigurations.get(event.getQueueKey());
 | 
			
		||||
            if (configuration == null) {
 | 
			
		||||
                log.warn("Received invalid partition change event for {} that is not managed by this service", event.getQueueKey());
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
            if (!configuration.isConsumerPerPartition()) {
 | 
			
		||||
                consumers.get(event.getQueueKey()).subscribe(event.getPartitions());
 | 
			
		||||
            } else {
 | 
			
		||||
                log.info("[{}] Subscribing consumer per partition: {}", serviceQueue, event.getPartitions());
 | 
			
		||||
                subscribeConsumerPerPartition(event.getQueueKey(), event.getPartitions());
 | 
			
		||||
            }
 | 
			
		||||
            event.getPartitionsMap().forEach((queueKey, partitions) -> {
 | 
			
		||||
                String serviceQueue = queueKey.getQueueName();
 | 
			
		||||
                log.info("[{}] Subscribing to partitions: {}", serviceQueue, partitions);
 | 
			
		||||
                Queue configuration = consumerConfigurations.get(queueKey);
 | 
			
		||||
                if (configuration == null) {
 | 
			
		||||
                    return;
 | 
			
		||||
                }
 | 
			
		||||
                if (!configuration.isConsumerPerPartition()) {
 | 
			
		||||
                    consumers.get(queueKey).subscribe(partitions);
 | 
			
		||||
                } else {
 | 
			
		||||
                    log.info("[{}] Subscribing consumer per partition: {}", serviceQueue, partitions);
 | 
			
		||||
                    subscribeConsumerPerPartition(queueKey, partitions);
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -501,7 +502,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        partitionService.recalculatePartitions(serviceInfoProvider.getServiceInfo(), new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE)));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
 | 
			
		||||
 | 
			
		||||
@ -102,7 +102,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 | 
			
		||||
 | 
			
		||||
@TestPropertySource(properties = {
 | 
			
		||||
        "edges.enabled=true",
 | 
			
		||||
        "queue.rule-engine.stats.enabled=false"
 | 
			
		||||
        "queue.rule-engine.stats.enabled=false",
 | 
			
		||||
        "edges.storage.sleep_between_batches=1000"
 | 
			
		||||
})
 | 
			
		||||
abstract public class AbstractEdgeTest extends AbstractControllerTest {
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -96,7 +96,7 @@ public class AssetEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        edgeImitator.expectMessageAmount(1);
 | 
			
		||||
        doDelete("/api/asset/" + savedAsset.getUuidId())
 | 
			
		||||
                .andExpect(status().isOk());
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages(1));
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages(5));
 | 
			
		||||
 | 
			
		||||
        // create asset #2 and assign to edge
 | 
			
		||||
        edgeImitator.expectMessageAmount(2);
 | 
			
		||||
@ -262,9 +262,9 @@ public class AssetEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
    private Asset saveAssetOnCloudAndVerifyDeliveryToEdge() throws Exception {
 | 
			
		||||
        // create asset and assign to edge
 | 
			
		||||
        Asset savedAsset = saveAsset(StringUtils.randomAlphanumeric(15));
 | 
			
		||||
        edgeImitator.expectMessageAmount(1); // asset message
 | 
			
		||||
        edgeImitator.expectMessageAmount(2); // asset and asset profile messages
 | 
			
		||||
        doPost("/api/edge/" + edge.getUuidId()
 | 
			
		||||
                + "/asset/" + savedAsset.getUuidId(), Device.class);
 | 
			
		||||
                + "/asset/" + savedAsset.getUuidId(), Asset.class);
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages());
 | 
			
		||||
        Optional<AssetUpdateMsg> assetUpdateMsgOpt = edgeImitator.findMessageByType(AssetUpdateMsg.class);
 | 
			
		||||
        Assert.assertTrue(assetUpdateMsgOpt.isPresent());
 | 
			
		||||
 | 
			
		||||
@ -42,7 +42,7 @@ public class CustomerEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        Customer customer = new Customer();
 | 
			
		||||
        customer.setTitle("Edge Customer");
 | 
			
		||||
        Customer savedCustomer = doPost("/api/customer", customer, Customer.class);
 | 
			
		||||
        Assert.assertFalse(edgeImitator.waitForMessages(1));
 | 
			
		||||
        Assert.assertFalse(edgeImitator.waitForMessages(5));
 | 
			
		||||
 | 
			
		||||
        // assign edge to customer
 | 
			
		||||
        edgeImitator.expectMessageAmount(2);
 | 
			
		||||
 | 
			
		||||
@ -99,7 +99,7 @@ public class DashboardEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        edgeImitator.expectMessageAmount(1);
 | 
			
		||||
        doDelete("/api/dashboard/" + savedDashboard.getUuidId())
 | 
			
		||||
                .andExpect(status().isOk());
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages(1));
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages(5));
 | 
			
		||||
 | 
			
		||||
        // create dashboard #2 and assign to edge
 | 
			
		||||
        edgeImitator.expectMessageAmount(1);
 | 
			
		||||
 | 
			
		||||
@ -106,7 +106,7 @@ public class DeviceEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        edgeImitator.expectMessageAmount(1);
 | 
			
		||||
        doDelete("/api/device/" + savedDevice.getUuidId())
 | 
			
		||||
                .andExpect(status().isOk());
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages(1));
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages(5));
 | 
			
		||||
 | 
			
		||||
        // create device #2 and assign to edge
 | 
			
		||||
        edgeImitator.expectMessageAmount(2);
 | 
			
		||||
 | 
			
		||||
@ -39,7 +39,7 @@ public class EdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        Customer customer = new Customer();
 | 
			
		||||
        customer.setTitle("Edge Customer");
 | 
			
		||||
        Customer savedCustomer = doPost("/api/customer", customer, Customer.class);
 | 
			
		||||
        Assert.assertFalse(edgeImitator.waitForMessages(1));
 | 
			
		||||
        Assert.assertFalse(edgeImitator.waitForMessages(5));
 | 
			
		||||
 | 
			
		||||
        // assign edge to customer
 | 
			
		||||
        edgeImitator.expectMessageAmount(2);
 | 
			
		||||
 | 
			
		||||
@ -105,7 +105,7 @@ public class EntityViewEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        edgeImitator.expectMessageAmount(1);
 | 
			
		||||
        doDelete("/api/entityView/" + savedEntityView.getUuidId())
 | 
			
		||||
                .andExpect(status().isOk());
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages(1));
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages(5));
 | 
			
		||||
 | 
			
		||||
        // create entity view #2 and assign to edge
 | 
			
		||||
        edgeImitator.expectMessageAmount(1);
 | 
			
		||||
 | 
			
		||||
@ -229,6 +229,6 @@ public class RuleChainEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        edgeImitator.expectMessageAmount(1);
 | 
			
		||||
        doDelete("/api/ruleChain/" + savedRuleChain.getUuidId())
 | 
			
		||||
                .andExpect(status().isOk());
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages(1));
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages(5));
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -120,7 +120,7 @@ public class UserEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        Customer customer = new Customer();
 | 
			
		||||
        customer.setTitle("Edge Customer");
 | 
			
		||||
        Customer savedCustomer = doPost("/api/customer", customer, Customer.class);
 | 
			
		||||
        Assert.assertFalse(edgeImitator.waitForMessages(1));
 | 
			
		||||
        Assert.assertFalse(edgeImitator.waitForMessages(5));
 | 
			
		||||
 | 
			
		||||
        // assign edge to customer
 | 
			
		||||
        edgeImitator.expectMessageAmount(2);
 | 
			
		||||
 | 
			
		||||
@ -173,6 +173,7 @@ public class EdgeImitator {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<List<Void>> processDownlinkMsg(DownlinkMsg downlinkMsg) {
 | 
			
		||||
        log.trace("processDownlinkMsg: {}", downlinkMsg);
 | 
			
		||||
        List<ListenableFuture<Void>> result = new ArrayList<>();
 | 
			
		||||
        if (downlinkMsg.getAdminSettingsUpdateMsgCount() > 0) {
 | 
			
		||||
            for (AdminSettingsUpdateMsg adminSettingsUpdateMsg : downlinkMsg.getAdminSettingsUpdateMsgList()) {
 | 
			
		||||
 | 
			
		||||
@ -286,10 +286,8 @@ public class HashPartitionServiceTest {
 | 
			
		||||
        HashPartitionService partitionService_common = createPartitionService();
 | 
			
		||||
        partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine));
 | 
			
		||||
        verifyPartitionChangeEvent(event -> {
 | 
			
		||||
            return event.getQueueKey().getTenantId().isSysTenantId() &&
 | 
			
		||||
                    event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) &&
 | 
			
		||||
                    event.getPartitions().stream().map(TopicPartitionInfo::getPartition).collect(Collectors.toSet())
 | 
			
		||||
                            .size() == systemQueue.getPartitions();
 | 
			
		||||
            QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, TenantId.SYS_TENANT_ID);
 | 
			
		||||
            return event.getPartitionsMap().get(queueKey).size() == systemQueue.getPartitions();
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        Mockito.reset(applicationEventPublisher);
 | 
			
		||||
@ -316,18 +314,15 @@ public class HashPartitionServiceTest {
 | 
			
		||||
        partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine));
 | 
			
		||||
        // expecting event about no partitions for isolated queue key
 | 
			
		||||
        verifyPartitionChangeEvent(event -> {
 | 
			
		||||
            return event.getQueueKey().getTenantId().equals(tenantId) &&
 | 
			
		||||
                    event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) &&
 | 
			
		||||
                    event.getPartitions().isEmpty();
 | 
			
		||||
            QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId);
 | 
			
		||||
            return event.getPartitionsMap().get(queueKey).isEmpty();
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        partitionService_dedicated.updateQueue(queueUpdateMsg);
 | 
			
		||||
        partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine));
 | 
			
		||||
        verifyPartitionChangeEvent(event -> {
 | 
			
		||||
            return event.getQueueKey().getTenantId().equals(tenantId) &&
 | 
			
		||||
                    event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) &&
 | 
			
		||||
                    event.getPartitions().stream().map(TopicPartitionInfo::getPartition).collect(Collectors.toSet())
 | 
			
		||||
                            .size() == isolatedQueue.getPartitions();
 | 
			
		||||
            QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId);
 | 
			
		||||
            return event.getPartitionsMap().get(queueKey).size() == isolatedQueue.getPartitions();
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -343,11 +338,9 @@ public class HashPartitionServiceTest {
 | 
			
		||||
                .setQueueName(isolatedQueue.getName())
 | 
			
		||||
                .build();
 | 
			
		||||
        partitionService_dedicated.removeQueue(queueDeleteMsg);
 | 
			
		||||
        partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine));
 | 
			
		||||
        verifyPartitionChangeEvent(event -> {
 | 
			
		||||
            return event.getQueueKey().getTenantId().equals(tenantId) &&
 | 
			
		||||
                    event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) &&
 | 
			
		||||
                    event.getPartitions().isEmpty();
 | 
			
		||||
            QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId);
 | 
			
		||||
            return event.getPartitionsMap().get(queueKey).isEmpty();
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -160,7 +160,9 @@ public class DefaultDeviceStateServiceTest {
 | 
			
		||||
        Mockito.reset(service, telemetrySubscriptionService);
 | 
			
		||||
        ReflectionTestUtils.setField(service, "defaultInactivityTimeoutMs", timeout);
 | 
			
		||||
        service.init();
 | 
			
		||||
        PartitionChangeEvent event = new PartitionChangeEvent(this, new QueueKey(ServiceType.TB_CORE), Collections.singleton(tpi));
 | 
			
		||||
        PartitionChangeEvent event = new PartitionChangeEvent(this, ServiceType.TB_CORE, Map.of(
 | 
			
		||||
                new QueueKey(ServiceType.TB_CORE), Collections.singleton(tpi)
 | 
			
		||||
        ));
 | 
			
		||||
        service.onApplicationEvent(event);
 | 
			
		||||
        Thread.sleep(100);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -11,8 +11,6 @@ transport.lwm2m.security.trust-credentials.keystore.store_file=lwm2m/credentials
 | 
			
		||||
 | 
			
		||||
# Edge disabled to speed up the context init. Will be enabled by @TestPropertySource in respective tests
 | 
			
		||||
edges.enabled=false
 | 
			
		||||
edges.storage.no_read_records_sleep=500
 | 
			
		||||
edges.storage.sleep_between_batches=500
 | 
			
		||||
actors.rpc.submit_strategy=BURST
 | 
			
		||||
queue.rule-engine.stats.enabled=true
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -20,8 +20,6 @@ import lombok.Getter;
 | 
			
		||||
import org.thingsboard.server.common.msg.MsgType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbActorMsg;
 | 
			
		||||
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @author Andrew Shvayka
 | 
			
		||||
 */
 | 
			
		||||
@ -30,8 +28,6 @@ public final class PartitionChangeMsg implements TbActorMsg {
 | 
			
		||||
 | 
			
		||||
    @Getter
 | 
			
		||||
    private final ServiceType serviceType;
 | 
			
		||||
    @Getter
 | 
			
		||||
    private final Set<TopicPartitionInfo> partitions;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public MsgType getMsgType() {
 | 
			
		||||
 | 
			
		||||
@ -179,10 +179,15 @@ public class HashPartitionService implements PartitionService {
 | 
			
		||||
    public void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) {
 | 
			
		||||
        TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
 | 
			
		||||
        QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
 | 
			
		||||
        myPartitions.remove(queueKey);
 | 
			
		||||
        partitionTopicsMap.remove(queueKey);
 | 
			
		||||
        partitionSizesMap.remove(queueKey);
 | 
			
		||||
        //TODO: remove after merging tb entity services
 | 
			
		||||
        removeTenant(tenantId);
 | 
			
		||||
 | 
			
		||||
        if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) {
 | 
			
		||||
            publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, Map.of(queueKey, Collections.emptySet()));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -271,6 +276,8 @@ public class HashPartitionService implements PartitionService {
 | 
			
		||||
        final ConcurrentMap<QueueKey, List<Integer>> oldPartitions = myPartitions;
 | 
			
		||||
        myPartitions = newPartitions;
 | 
			
		||||
 | 
			
		||||
        Map<QueueKey, Set<TopicPartitionInfo>> changedPartitionsMap = new HashMap<>();
 | 
			
		||||
 | 
			
		||||
        Set<QueueKey> removed = new HashSet<>();
 | 
			
		||||
        oldPartitions.forEach((queueKey, partitions) -> {
 | 
			
		||||
            if (!newPartitions.containsKey(queueKey)) {
 | 
			
		||||
@ -286,7 +293,7 @@ public class HashPartitionService implements PartitionService {
 | 
			
		||||
        }
 | 
			
		||||
        removed.forEach(queueKey -> {
 | 
			
		||||
            log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", queueKey);
 | 
			
		||||
            applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, queueKey, Collections.emptySet()));
 | 
			
		||||
            changedPartitionsMap.put(queueKey, Collections.emptySet());
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        myPartitions.forEach((queueKey, partitions) -> {
 | 
			
		||||
@ -295,9 +302,17 @@ public class HashPartitionService implements PartitionService {
 | 
			
		||||
                Set<TopicPartitionInfo> tpiList = partitions.stream()
 | 
			
		||||
                        .map(partition -> buildTopicPartitionInfo(queueKey, partition))
 | 
			
		||||
                        .collect(Collectors.toSet());
 | 
			
		||||
                applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, queueKey, tpiList));
 | 
			
		||||
                changedPartitionsMap.put(queueKey, tpiList);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
        if (!changedPartitionsMap.isEmpty()) {
 | 
			
		||||
            Map<ServiceType, Map<QueueKey, Set<TopicPartitionInfo>>> partitionsByServiceType = new HashMap<>();
 | 
			
		||||
            changedPartitionsMap.forEach((queueKey, partitions) -> {
 | 
			
		||||
                partitionsByServiceType.computeIfAbsent(queueKey.getType(), serviceType -> new HashMap<>())
 | 
			
		||||
                        .put(queueKey, partitions);
 | 
			
		||||
            });
 | 
			
		||||
            partitionsByServiceType.forEach(this::publishPartitionChangeEvent);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (currentOtherServices == null) {
 | 
			
		||||
            currentOtherServices = new ArrayList<>(otherServices);
 | 
			
		||||
@ -328,6 +343,18 @@ public class HashPartitionService implements PartitionService {
 | 
			
		||||
        applicationEventPublisher.publishEvent(new ServiceListChangedEvent(otherServices, currentService));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void publishPartitionChangeEvent(ServiceType serviceType, Map<QueueKey, Set<TopicPartitionInfo>> partitionsMap) {
 | 
			
		||||
        if (log.isDebugEnabled()) {
 | 
			
		||||
            log.debug("Publishing partition change event for service type " + serviceType + ":" + System.lineSeparator() +
 | 
			
		||||
                    partitionsMap.entrySet().stream()
 | 
			
		||||
                            .map(entry -> entry.getKey() + " - " + entry.getValue().stream()
 | 
			
		||||
                                    .map(TopicPartitionInfo::getFullTopicName).sorted()
 | 
			
		||||
                                    .collect(Collectors.toList()))
 | 
			
		||||
                            .collect(Collectors.joining(System.lineSeparator())));
 | 
			
		||||
        }
 | 
			
		||||
        applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceType, partitionsMap));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public Set<String> getAllServiceIds(ServiceType serviceType) {
 | 
			
		||||
        return getAllServices(serviceType).stream().map(ServiceInfo::getServiceId).collect(Collectors.toSet());
 | 
			
		||||
@ -493,6 +520,9 @@ public class HashPartitionService implements PartitionService {
 | 
			
		||||
                    }
 | 
			
		||||
                    responsibleServices.put(profileId, responsible);
 | 
			
		||||
                }
 | 
			
		||||
                if (responsible.isEmpty()) {
 | 
			
		||||
                    return null;
 | 
			
		||||
                }
 | 
			
		||||
                servers = responsible;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -21,6 +21,8 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.QueueKey;
 | 
			
		||||
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
 | 
			
		||||
@ToString(callSuper = true)
 | 
			
		||||
@ -29,17 +31,19 @@ public class PartitionChangeEvent extends TbApplicationEvent {
 | 
			
		||||
    private static final long serialVersionUID = -8731788167026510559L;
 | 
			
		||||
 | 
			
		||||
    @Getter
 | 
			
		||||
    private final QueueKey queueKey;
 | 
			
		||||
    private final ServiceType serviceType;
 | 
			
		||||
    @Getter
 | 
			
		||||
    private final Set<TopicPartitionInfo> partitions;
 | 
			
		||||
    private final Map<QueueKey, Set<TopicPartitionInfo>> partitionsMap;
 | 
			
		||||
 | 
			
		||||
    public PartitionChangeEvent(Object source, QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
 | 
			
		||||
    public PartitionChangeEvent(Object source, ServiceType serviceType, Map<QueueKey, Set<TopicPartitionInfo>> partitionsMap) {
 | 
			
		||||
        super(source);
 | 
			
		||||
        this.queueKey = queueKey;
 | 
			
		||||
        this.partitions = partitions;
 | 
			
		||||
        this.serviceType = serviceType;
 | 
			
		||||
        this.partitionsMap = partitionsMap;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public ServiceType getServiceType() {
 | 
			
		||||
        return queueKey.getType();
 | 
			
		||||
    // only for service types that have single QueueKey
 | 
			
		||||
    public Set<TopicPartitionInfo> getPartitions() {
 | 
			
		||||
        return partitionsMap.values().stream().findAny().orElse(Collections.emptySet());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1,5 +1,2 @@
 | 
			
		||||
--PostgreSQL specific truncate to fit constraints
 | 
			
		||||
TRUNCATE TABLE device_credentials, device, device_profile, asset, asset_profile, ota_package, rule_node_state, rule_node, rule_chain, alarm_comment, alarm, entity_alarm;
 | 
			
		||||
 | 
			
		||||
-- Decrease seq_id column to make sure to cover cases of new sequential cycle during the tests
 | 
			
		||||
ALTER SEQUENCE edge_event_seq_id_seq MAXVALUE 256;
 | 
			
		||||
TRUNCATE TABLE device_credentials, device, device_profile, asset, asset_profile, ota_package, rule_node_state, rule_node, rule_chain, alarm_comment, alarm, entity_alarm;
 | 
			
		||||
@ -63,44 +63,51 @@
 | 
			
		||||
 | 
			
		||||
  .mat-mdc-dialog-content {
 | 
			
		||||
    max-height: 80vh;
 | 
			
		||||
    padding: 16px;
 | 
			
		||||
    padding: 0 16px 16px 16px;
 | 
			
		||||
    border-top: 16px solid #fff;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
:host ::ng-deep {
 | 
			
		||||
  .tb-markdown-view {
 | 
			
		||||
    padding: 16px 16px 32px 16px;
 | 
			
		||||
    div.code-wrapper button.clipboard-btn {
 | 
			
		||||
      right: -2px !important;
 | 
			
		||||
      p {
 | 
			
		||||
        color: #305680;
 | 
			
		||||
      }
 | 
			
		||||
      p, div {
 | 
			
		||||
        background-color: #F3F6FA;
 | 
			
		||||
      }
 | 
			
		||||
      div {
 | 
			
		||||
        img {
 | 
			
		||||
          display: none;
 | 
			
		||||
        }
 | 
			
		||||
        &:after {
 | 
			
		||||
          content: "";
 | 
			
		||||
          position: initial;
 | 
			
		||||
          display: block;
 | 
			
		||||
          width: 18px;
 | 
			
		||||
          height: 18px;
 | 
			
		||||
          background: #305680;
 | 
			
		||||
          mask-image: url(/assets/copy-code-icon.svg);
 | 
			
		||||
          mask-repeat: no-repeat;
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      &.multiline {
 | 
			
		||||
    div {
 | 
			
		||||
      background-color: #F3F6FA;
 | 
			
		||||
      &.code-wrapper button.clipboard-btn {
 | 
			
		||||
        right: -2px !important;
 | 
			
		||||
        p {
 | 
			
		||||
          color: $tb-primary-color !important;
 | 
			
		||||
        }
 | 
			
		||||
        p, div {
 | 
			
		||||
          background-color: unset !important;
 | 
			
		||||
        }
 | 
			
		||||
        div {
 | 
			
		||||
          img {
 | 
			
		||||
            display: none;
 | 
			
		||||
          }
 | 
			
		||||
          &:after {
 | 
			
		||||
            content: "";
 | 
			
		||||
            position: initial;
 | 
			
		||||
            display: block;
 | 
			
		||||
            width: 18px;
 | 
			
		||||
            height: 18px;
 | 
			
		||||
            background: $tb-primary-color;
 | 
			
		||||
            mask-image: url(/assets/copy-code-icon.svg);
 | 
			
		||||
            mask-repeat: no-repeat;
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
        &.multiline {
 | 
			
		||||
          right: -2px !important;
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    & > *:not(ul) {
 | 
			
		||||
      padding-right: unset !important;
 | 
			
		||||
      padding-left: unset !important;
 | 
			
		||||
    }
 | 
			
		||||
    pre[class*="language-"] {
 | 
			
		||||
      border: 1px solid $tb-primary-color !important;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  .mdc-button__label > span {
 | 
			
		||||
    .mat-icon {
 | 
			
		||||
@ -109,6 +116,13 @@
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  .mat-mdc-tab-header {
 | 
			
		||||
    position: sticky;
 | 
			
		||||
    top: 0;
 | 
			
		||||
    z-index: 10;
 | 
			
		||||
    background-color: #fff;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  .tabs-icon {
 | 
			
		||||
    margin-right: 8px;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user