diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index 84f8c36896..968e15ea4b 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -76,7 +76,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer private TbApplicationEventListener clusterTopologyChangeListener = new TbApplicationEventListener<>() { @Override protected void onTbApplicationEvent(ClusterTopologyChangeEvent event) { - if (event.getServiceQueueKeys().stream().anyMatch(key -> ServiceType.TB_CORE.equals(key.getServiceType()))) { + if (event.getQueueKeys().stream().anyMatch(key -> ServiceType.TB_CORE.equals(key.getType()))) { /* * If the cluster topology has changed, we need to push all current subscriptions to SubscriptionManagerService again. * Otherwise, the SubscriptionManagerService may "forget" those subscriptions in case of restart. diff --git a/application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java index c8fb678470..74738c361f 100644 --- a/application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cluster/routing/HashPartitionServiceTest.java @@ -74,8 +74,6 @@ public class HashPartitionServiceTest { ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName); TransportProtos.ServiceInfo currentServer = TransportProtos.ServiceInfo.newBuilder() .setServiceId("tb-core-0") -// .setTenantIdMSB(TenantId.NULL_UUID.getMostSignificantBits()) -// .setTenantIdLSB(TenantId.NULL_UUID.getLeastSignificantBits()) .addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name())) .build(); // when(queueService.resolve(Mockito.any(), Mockito.anyString())).thenAnswer(i -> i.getArguments()[1]); @@ -84,8 +82,6 @@ public class HashPartitionServiceTest { for (int i = 1; i < SERVER_COUNT; i++) { otherServers.add(TransportProtos.ServiceInfo.newBuilder() .setServiceId("tb-rule-" + i) -// .setTenantIdMSB(TenantId.NULL_UUID.getMostSignificantBits()) -// .setTenantIdLSB(TenantId.NULL_UUID.getLeastSignificantBits()) .addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name())) .build()); } diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 32bcfd688d..50f114856f 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -20,21 +20,12 @@ package transport; option java_package = "org.thingsboard.server.gen.transport"; option java_outer_classname = "TransportProtos"; -//message QueueInfo { -// string name = 1; -// string topic = 2; -// int32 partitions = 3; -//} - /** * Service Discovery Data Structures; */ message ServiceInfo { string serviceId = 1; repeated string serviceTypes = 2; -// int64 tenantIdMSB = 3; -// int64 tenantIdLSB = 4; -// repeated QueueInfo ruleEngineQueues = 5; repeated string transports = 6; } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/ServiceQueue.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/ServiceQueue.java deleted file mode 100644 index 76f4202607..0000000000 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/ServiceQueue.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.common.msg.queue; - -import lombok.ToString; - -import java.util.Objects; - -@ToString -public class ServiceQueue { - - public static final String MAIN = "Main"; - - private final ServiceType type; - private final String queue; - - public ServiceQueue(ServiceType type) { - this.type = type; - this.queue = MAIN; - } - - public ServiceQueue(ServiceType type, String queue) { - this.type = type; - this.queue = queue != null ? queue : MAIN; - } - - public ServiceType getType() { - return type; - } - - public String getQueue() { - return queue; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ServiceQueue that = (ServiceQueue) o; - return type == that.type && - queue.equals(that.queue); - } - - @Override - public int hashCode() { - return Objects.hash(type, queue); - } - -} diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/ServiceQueueKey.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/ServiceQueueKey.java deleted file mode 100644 index f47f5ab8b1..0000000000 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/ServiceQueueKey.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.common.msg.queue; - -import lombok.Getter; -import lombok.ToString; - -import java.util.Objects; - -@ToString -public class ServiceQueueKey { - @Getter - private final ServiceQueue serviceQueue; - - public ServiceQueueKey(ServiceQueue serviceQueue) { - this.serviceQueue = serviceQueue; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ServiceQueueKey that = (ServiceQueueKey) o; - return serviceQueue.equals(that.serviceQueue); - } - - @Override - public int hashCode() { - return Objects.hash(serviceQueue); - } - - public ServiceType getServiceType() { - return serviceQueue.getType(); - } -} \ No newline at end of file diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java index 78ed1175ba..a3744c3946 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java @@ -48,10 +48,6 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { @Value("${service.type:monolith}") private String serviceType; - @Getter - @Value("${service.tenant_id:}") - private String tenantIdStr; - @Autowired private ApplicationContext applicationContext; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 5a522f9233..ccf8b9dd92 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -24,8 +24,6 @@ import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.msg.queue.ServiceQueue; -import org.thingsboard.server.common.msg.queue.ServiceQueueKey; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos; @@ -260,9 +258,9 @@ public class HashPartitionService implements PartitionService { if (currentOtherServices == null) { currentOtherServices = new ArrayList<>(otherServices); } else { - Set changes = new HashSet<>(); - Map> currentMap = getServiceKeyListMap(currentOtherServices); - Map> newMap = getServiceKeyListMap(otherServices); + Set changes = new HashSet<>(); + Map> currentMap = getServiceKeyListMap(currentOtherServices); + Map> newMap = getServiceKeyListMap(otherServices); currentOtherServices = otherServices; currentMap.forEach((key, list) -> { if (!list.equals(newMap.get(key))) { @@ -327,19 +325,17 @@ public class HashPartitionService implements PartitionService { return list == null ? 0 : list.size(); } - private Map> getServiceKeyListMap(List services) { - final Map> currentMap = new HashMap<>(); + private Map> getServiceKeyListMap(List services) { + final Map> currentMap = new HashMap<>(); services.forEach(serviceInfo -> { for (String serviceTypeStr : serviceInfo.getServiceTypesList()) { ServiceType serviceType = ServiceType.valueOf(serviceTypeStr.toUpperCase()); if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) { -// for (TransportProtos.QueueInfo queue : serviceInfo.getRuleEngineQueuesList()) { -// ServiceQueueKey serviceQueueKey = new ServiceQueueKey(new ServiceQueue(serviceType, queue.getName()), getSystemOrIsolatedTenantId(serviceInfo)); -// currentMap.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(serviceInfo); -// } + partitionTopicsMap.keySet().forEach(queueKey -> + currentMap.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(serviceInfo)); } else { - ServiceQueueKey serviceQueueKey = new ServiceQueueKey(new ServiceQueue(serviceType)); - currentMap.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(serviceInfo); + QueueKey queueKey = new QueueKey(serviceType); + currentMap.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(serviceInfo); } } }); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/ClusterTopologyChangeEvent.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/ClusterTopologyChangeEvent.java index 8aadf8515d..fd0911423a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/ClusterTopologyChangeEvent.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/ClusterTopologyChangeEvent.java @@ -16,20 +16,19 @@ package org.thingsboard.server.queue.discovery.event; import lombok.Getter; -import org.thingsboard.server.common.msg.queue.ServiceQueueKey; +import org.thingsboard.server.queue.discovery.QueueKey; import java.util.Set; - public class ClusterTopologyChangeEvent extends TbApplicationEvent { private static final long serialVersionUID = -2441739930040282254L; @Getter - private final Set serviceQueueKeys; + private final Set queueKeys; - public ClusterTopologyChangeEvent(Object source, Set serviceQueueKeys) { + public ClusterTopologyChangeEvent(Object source, Set queueKeys) { super(source); - this.serviceQueueKeys = serviceQueueKeys; + this.queueKeys = queueKeys; } } diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 39cb9b35d0..51a4bf4492 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -269,8 +269,6 @@ service: type: "${TB_SERVICE_TYPE:tb-transport}" # Unique id for this service (autogenerated if empty) id: "${TB_SERVICE_ID:}" - tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id. - metrics: # Enable/disable actuator metrics. diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 29d8a1d381..7574687ecc 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -257,8 +257,6 @@ service: type: "${TB_SERVICE_TYPE:tb-transport}" # Unique id for this service (autogenerated if empty) id: "${TB_SERVICE_ID:}" - tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id. - metrics: # Enable/disable actuator metrics. diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 361ac7c651..19d70e0bc2 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -336,8 +336,6 @@ service: type: "${TB_SERVICE_TYPE:tb-transport}" # Unique id for this service (autogenerated if empty) id: "${TB_SERVICE_ID:}" - tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id. - metrics: # Enable/disable actuator metrics. diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 1b84701ccf..1a6f04e598 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -287,7 +287,6 @@ service: type: "${TB_SERVICE_TYPE:tb-transport}" # Unique id for this service (autogenerated if empty) id: "${TB_SERVICE_ID:}" - tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id. metrics: # Enable/disable actuator metrics. diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 7562af359b..6ff1a37442 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -237,7 +237,6 @@ service: type: "${TB_SERVICE_TYPE:tb-transport}" # Unique id for this service (autogenerated if empty) id: "${TB_SERVICE_ID:}" - tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id. metrics: # Enable/disable actuator metrics.