refactoring
This commit is contained in:
parent
6c1031536d
commit
ee8c9deda9
@ -76,7 +76,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
|
||||
private TbApplicationEventListener<ClusterTopologyChangeEvent> clusterTopologyChangeListener = new TbApplicationEventListener<>() {
|
||||
@Override
|
||||
protected void onTbApplicationEvent(ClusterTopologyChangeEvent event) {
|
||||
if (event.getServiceQueueKeys().stream().anyMatch(key -> ServiceType.TB_CORE.equals(key.getServiceType()))) {
|
||||
if (event.getQueueKeys().stream().anyMatch(key -> ServiceType.TB_CORE.equals(key.getType()))) {
|
||||
/*
|
||||
* If the cluster topology has changed, we need to push all current subscriptions to SubscriptionManagerService again.
|
||||
* Otherwise, the SubscriptionManagerService may "forget" those subscriptions in case of restart.
|
||||
|
||||
@ -74,8 +74,6 @@ public class HashPartitionServiceTest {
|
||||
ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName);
|
||||
TransportProtos.ServiceInfo currentServer = TransportProtos.ServiceInfo.newBuilder()
|
||||
.setServiceId("tb-core-0")
|
||||
// .setTenantIdMSB(TenantId.NULL_UUID.getMostSignificantBits())
|
||||
// .setTenantIdLSB(TenantId.NULL_UUID.getLeastSignificantBits())
|
||||
.addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name()))
|
||||
.build();
|
||||
// when(queueService.resolve(Mockito.any(), Mockito.anyString())).thenAnswer(i -> i.getArguments()[1]);
|
||||
@ -84,8 +82,6 @@ public class HashPartitionServiceTest {
|
||||
for (int i = 1; i < SERVER_COUNT; i++) {
|
||||
otherServers.add(TransportProtos.ServiceInfo.newBuilder()
|
||||
.setServiceId("tb-rule-" + i)
|
||||
// .setTenantIdMSB(TenantId.NULL_UUID.getMostSignificantBits())
|
||||
// .setTenantIdLSB(TenantId.NULL_UUID.getLeastSignificantBits())
|
||||
.addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name()))
|
||||
.build());
|
||||
}
|
||||
|
||||
@ -20,21 +20,12 @@ package transport;
|
||||
option java_package = "org.thingsboard.server.gen.transport";
|
||||
option java_outer_classname = "TransportProtos";
|
||||
|
||||
//message QueueInfo {
|
||||
// string name = 1;
|
||||
// string topic = 2;
|
||||
// int32 partitions = 3;
|
||||
//}
|
||||
|
||||
/**
|
||||
* Service Discovery Data Structures;
|
||||
*/
|
||||
message ServiceInfo {
|
||||
string serviceId = 1;
|
||||
repeated string serviceTypes = 2;
|
||||
// int64 tenantIdMSB = 3;
|
||||
// int64 tenantIdLSB = 4;
|
||||
// repeated QueueInfo ruleEngineQueues = 5;
|
||||
repeated string transports = 6;
|
||||
}
|
||||
|
||||
|
||||
@ -1,62 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.msg.queue;
|
||||
|
||||
import lombok.ToString;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
@ToString
|
||||
public class ServiceQueue {
|
||||
|
||||
public static final String MAIN = "Main";
|
||||
|
||||
private final ServiceType type;
|
||||
private final String queue;
|
||||
|
||||
public ServiceQueue(ServiceType type) {
|
||||
this.type = type;
|
||||
this.queue = MAIN;
|
||||
}
|
||||
|
||||
public ServiceQueue(ServiceType type, String queue) {
|
||||
this.type = type;
|
||||
this.queue = queue != null ? queue : MAIN;
|
||||
}
|
||||
|
||||
public ServiceType getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public String getQueue() {
|
||||
return queue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
ServiceQueue that = (ServiceQueue) o;
|
||||
return type == that.type &&
|
||||
queue.equals(that.queue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(type, queue);
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,48 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.msg.queue;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.ToString;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
@ToString
|
||||
public class ServiceQueueKey {
|
||||
@Getter
|
||||
private final ServiceQueue serviceQueue;
|
||||
|
||||
public ServiceQueueKey(ServiceQueue serviceQueue) {
|
||||
this.serviceQueue = serviceQueue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
ServiceQueueKey that = (ServiceQueueKey) o;
|
||||
return serviceQueue.equals(that.serviceQueue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(serviceQueue);
|
||||
}
|
||||
|
||||
public ServiceType getServiceType() {
|
||||
return serviceQueue.getType();
|
||||
}
|
||||
}
|
||||
@ -48,10 +48,6 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
|
||||
@Value("${service.type:monolith}")
|
||||
private String serviceType;
|
||||
|
||||
@Getter
|
||||
@Value("${service.tenant_id:}")
|
||||
private String tenantIdStr;
|
||||
|
||||
@Autowired
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
|
||||
@ -24,8 +24,6 @@ import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.QueueId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceQueue;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceQueueKey;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
@ -260,9 +258,9 @@ public class HashPartitionService implements PartitionService {
|
||||
if (currentOtherServices == null) {
|
||||
currentOtherServices = new ArrayList<>(otherServices);
|
||||
} else {
|
||||
Set<ServiceQueueKey> changes = new HashSet<>();
|
||||
Map<ServiceQueueKey, List<ServiceInfo>> currentMap = getServiceKeyListMap(currentOtherServices);
|
||||
Map<ServiceQueueKey, List<ServiceInfo>> newMap = getServiceKeyListMap(otherServices);
|
||||
Set<QueueKey> changes = new HashSet<>();
|
||||
Map<QueueKey, List<ServiceInfo>> currentMap = getServiceKeyListMap(currentOtherServices);
|
||||
Map<QueueKey, List<ServiceInfo>> newMap = getServiceKeyListMap(otherServices);
|
||||
currentOtherServices = otherServices;
|
||||
currentMap.forEach((key, list) -> {
|
||||
if (!list.equals(newMap.get(key))) {
|
||||
@ -327,19 +325,17 @@ public class HashPartitionService implements PartitionService {
|
||||
return list == null ? 0 : list.size();
|
||||
}
|
||||
|
||||
private Map<ServiceQueueKey, List<ServiceInfo>> getServiceKeyListMap(List<ServiceInfo> services) {
|
||||
final Map<ServiceQueueKey, List<ServiceInfo>> currentMap = new HashMap<>();
|
||||
private Map<QueueKey, List<ServiceInfo>> getServiceKeyListMap(List<ServiceInfo> services) {
|
||||
final Map<QueueKey, List<ServiceInfo>> currentMap = new HashMap<>();
|
||||
services.forEach(serviceInfo -> {
|
||||
for (String serviceTypeStr : serviceInfo.getServiceTypesList()) {
|
||||
ServiceType serviceType = ServiceType.valueOf(serviceTypeStr.toUpperCase());
|
||||
if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {
|
||||
// for (TransportProtos.QueueInfo queue : serviceInfo.getRuleEngineQueuesList()) {
|
||||
// ServiceQueueKey serviceQueueKey = new ServiceQueueKey(new ServiceQueue(serviceType, queue.getName()), getSystemOrIsolatedTenantId(serviceInfo));
|
||||
// currentMap.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(serviceInfo);
|
||||
// }
|
||||
partitionTopicsMap.keySet().forEach(queueKey ->
|
||||
currentMap.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(serviceInfo));
|
||||
} else {
|
||||
ServiceQueueKey serviceQueueKey = new ServiceQueueKey(new ServiceQueue(serviceType));
|
||||
currentMap.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(serviceInfo);
|
||||
QueueKey queueKey = new QueueKey(serviceType);
|
||||
currentMap.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(serviceInfo);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@ -16,20 +16,19 @@
|
||||
package org.thingsboard.server.queue.discovery.event;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceQueueKey;
|
||||
import org.thingsboard.server.queue.discovery.QueueKey;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
|
||||
public class ClusterTopologyChangeEvent extends TbApplicationEvent {
|
||||
|
||||
private static final long serialVersionUID = -2441739930040282254L;
|
||||
|
||||
@Getter
|
||||
private final Set<ServiceQueueKey> serviceQueueKeys;
|
||||
private final Set<QueueKey> queueKeys;
|
||||
|
||||
public ClusterTopologyChangeEvent(Object source, Set<ServiceQueueKey> serviceQueueKeys) {
|
||||
public ClusterTopologyChangeEvent(Object source, Set<QueueKey> queueKeys) {
|
||||
super(source);
|
||||
this.serviceQueueKeys = serviceQueueKeys;
|
||||
this.queueKeys = queueKeys;
|
||||
}
|
||||
}
|
||||
|
||||
@ -269,8 +269,6 @@ service:
|
||||
type: "${TB_SERVICE_TYPE:tb-transport}"
|
||||
# Unique id for this service (autogenerated if empty)
|
||||
id: "${TB_SERVICE_ID:}"
|
||||
tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
|
||||
|
||||
|
||||
metrics:
|
||||
# Enable/disable actuator metrics.
|
||||
|
||||
@ -257,8 +257,6 @@ service:
|
||||
type: "${TB_SERVICE_TYPE:tb-transport}"
|
||||
# Unique id for this service (autogenerated if empty)
|
||||
id: "${TB_SERVICE_ID:}"
|
||||
tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
|
||||
|
||||
|
||||
metrics:
|
||||
# Enable/disable actuator metrics.
|
||||
|
||||
@ -336,8 +336,6 @@ service:
|
||||
type: "${TB_SERVICE_TYPE:tb-transport}"
|
||||
# Unique id for this service (autogenerated if empty)
|
||||
id: "${TB_SERVICE_ID:}"
|
||||
tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
|
||||
|
||||
|
||||
metrics:
|
||||
# Enable/disable actuator metrics.
|
||||
|
||||
@ -287,7 +287,6 @@ service:
|
||||
type: "${TB_SERVICE_TYPE:tb-transport}"
|
||||
# Unique id for this service (autogenerated if empty)
|
||||
id: "${TB_SERVICE_ID:}"
|
||||
tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
|
||||
|
||||
metrics:
|
||||
# Enable/disable actuator metrics.
|
||||
|
||||
@ -237,7 +237,6 @@ service:
|
||||
type: "${TB_SERVICE_TYPE:tb-transport}"
|
||||
# Unique id for this service (autogenerated if empty)
|
||||
id: "${TB_SERVICE_ID:}"
|
||||
tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
|
||||
|
||||
metrics:
|
||||
# Enable/disable actuator metrics.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user