refactored due to comments

This commit is contained in:
YevhenBondarenko 2023-07-28 14:20:33 +02:00
parent 75b3882782
commit d9c39c362e
9 changed files with 62 additions and 40 deletions

View File

@ -96,7 +96,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem' # Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}"
cluster: cluster:
stats: stats:

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.queue.discovery; package org.thingsboard.server.queue.discovery;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ProtocolStringList;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -68,7 +69,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
private Integer zkSessionTimeout; private Integer zkSessionTimeout;
@Value("${zk.zk_dir}") @Value("${zk.zk_dir}")
private String zkDir; private String zkDir;
@Value("${zk.recalculate_delay:120000}") @Value("${zk.recalculate_delay:60000}")
private Long recalculateDelay; private Long recalculateDelay;
protected final ConcurrentHashMap<String, ScheduledFuture<?>> delayedTasks; protected final ConcurrentHashMap<String, ScheduledFuture<?>> delayedTasks;
@ -294,35 +295,39 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
log.error("Failed to decode server instance for node {}", data.getPath(), e); log.error("Failed to decode server instance for node {}", data.getPath(), e);
throw e; throw e;
} }
log.debug("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), instance.getServiceId());
String serviceId = instance.getServiceId();
ProtocolStringList serviceTypesList = instance.getServiceTypesList();
log.trace("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), serviceId);
switch (pathChildrenCacheEvent.getType()) { switch (pathChildrenCacheEvent.getType()) {
case CHILD_ADDED: case CHILD_ADDED:
ScheduledFuture<?> task = delayedTasks.remove(instance.getServiceId()); ScheduledFuture<?> task = delayedTasks.remove(serviceId);
if (task != null) { if (task != null) {
if (task.cancel(false)) { if (task.cancel(false)) {
log.debug("[{}] Recalculate partitions ignored. Service was restarted in time [{}].", log.debug("[{}] Recalculate partitions ignored. Service was restarted in time [{}].",
instance.getServiceId(), instance.getServiceTypesList()); serviceId, serviceTypesList);
} else { } else {
log.debug("[{}] Going to recalculate partitions. Service was not restarted in time [{}]!", log.debug("[{}] Going to recalculate partitions. Service was not restarted in time [{}]!",
instance.getServiceId(), instance.getServiceTypesList()); serviceId, serviceTypesList);
recalculatePartitions(); recalculatePartitions();
} }
} else { } else {
log.debug("[{}] Going to recalculate partitions due to adding new node [{}].", log.trace("[{}] Going to recalculate partitions due to adding new node [{}].",
instance.getServiceId(), instance.getServiceTypesList()); serviceId, serviceTypesList);
recalculatePartitions(); recalculatePartitions();
} }
break; break;
case CHILD_REMOVED: case CHILD_REMOVED:
ScheduledFuture<?> future = zkExecutorService.schedule(() -> { ScheduledFuture<?> future = zkExecutorService.schedule(() -> {
log.debug("[{}] Going to recalculate partitions due to removed node [{}]", log.debug("[{}] Going to recalculate partitions due to removed node [{}]",
instance.getServiceId(), instance.getServiceTypesList()); serviceId, serviceTypesList);
ScheduledFuture<?> removedTask = delayedTasks.remove(instance.getServiceId()); ScheduledFuture<?> removedTask = delayedTasks.remove(serviceId);
if (removedTask != null) { if (removedTask != null) {
recalculatePartitions(); recalculatePartitions();
} }
}, recalculateDelay, TimeUnit.MILLISECONDS); }, recalculateDelay, TimeUnit.MILLISECONDS);
delayedTasks.put(instance.getServiceId(), future); delayedTasks.put(serviceId, future);
break; break;
default: default:
break; break;
@ -334,6 +339,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
* Synchronized to ensure that other servers info is up to date * Synchronized to ensure that other servers info is up to date
* */ * */
synchronized void recalculatePartitions() { synchronized void recalculatePartitions() {
delayedTasks.values().forEach(future -> future.cancel(false));
delayedTasks.clear(); delayedTasks.clear();
partitionService.recalculatePartitions(serviceInfoProvider.getServiceInfo(), getOtherServers()); partitionService.recalculatePartitions(serviceInfoProvider.getServiceInfo(), getOtherServers());
} }

View File

@ -63,68 +63,76 @@ public class ZkDiscoveryServiceTest {
@Mock @Mock
private PathChildrenCache cache; private PathChildrenCache cache;
private ScheduledExecutorService zkExecutorService;
@Mock @Mock
private CuratorFramework curatorFramework; private CuratorFramework curatorFramework;
private ZkDiscoveryService zkDiscoveryService; private ZkDiscoveryService zkDiscoveryService;
private static final long RECALCULATE_DELAY = 100L;
final TransportProtos.ServiceInfo currentInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-rule-engine-0").build();
final ChildData currentData = new ChildData("/thingsboard/nodes/0000000010", null, currentInfo.toByteArray());
final TransportProtos.ServiceInfo childInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-rule-engine-1").build();
final ChildData childData = new ChildData("/thingsboard/nodes/0000000020", null, childInfo.toByteArray());
@Before @Before
public void setup() { public void setup() {
zkDiscoveryService = Mockito.spy(new ZkDiscoveryService(serviceInfoProvider, partitionService)); zkDiscoveryService = Mockito.spy(new ZkDiscoveryService(serviceInfoProvider, partitionService));
zkExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("zk-discovery")); ScheduledExecutorService zkExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("zk-discovery"));
when(client.getState()).thenReturn(CuratorFrameworkState.STARTED); when(client.getState()).thenReturn(CuratorFrameworkState.STARTED);
ReflectionTestUtils.setField(zkDiscoveryService, "stopped", false); ReflectionTestUtils.setField(zkDiscoveryService, "stopped", false);
ReflectionTestUtils.setField(zkDiscoveryService, "client", client); ReflectionTestUtils.setField(zkDiscoveryService, "client", client);
ReflectionTestUtils.setField(zkDiscoveryService, "cache", cache); ReflectionTestUtils.setField(zkDiscoveryService, "cache", cache);
ReflectionTestUtils.setField(zkDiscoveryService, "nodePath", "/thingsboard/nodes/0000000010"); ReflectionTestUtils.setField(zkDiscoveryService, "nodePath", "/thingsboard/nodes/0000000010");
ReflectionTestUtils.setField(zkDiscoveryService, "zkExecutorService", zkExecutorService); ReflectionTestUtils.setField(zkDiscoveryService, "zkExecutorService", zkExecutorService);
ReflectionTestUtils.setField(zkDiscoveryService, "recalculateDelay", 1000L); ReflectionTestUtils.setField(zkDiscoveryService, "recalculateDelay", RECALCULATE_DELAY);
ReflectionTestUtils.setField(zkDiscoveryService, "zkDir", "/thingsboard"); ReflectionTestUtils.setField(zkDiscoveryService, "zkDir", "/thingsboard");
}
@Test
public void restartNodeTest() throws Exception {
var currentInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("currentId").build();
var currentData = new ChildData("/thingsboard/nodes/0000000010", null, currentInfo.toByteArray());
var childInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("childId").build();
var childData = new ChildData("/thingsboard/nodes/0000000020", null, childInfo.toByteArray());
when(serviceInfoProvider.getServiceInfo()).thenReturn(currentInfo); when(serviceInfoProvider.getServiceInfo()).thenReturn(currentInfo);
List<ChildData> dataList = new ArrayList<>(); List<ChildData> dataList = new ArrayList<>();
dataList.add(currentData); dataList.add(currentData);
when(cache.getCurrentData()).thenReturn(dataList); when(cache.getCurrentData()).thenReturn(dataList);
}
@Test
public void restartNodeInTimeTest() throws Exception {
startNode(childData); startNode(childData);
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo)));
reset(partitionService); reset(partitionService);
//Restart in timeAssert.assertTrue(zkDiscoveryService.delayedTasks.isEmpty());
stopNode(childData); stopNode(childData);
assertEquals(1, zkDiscoveryService.delayedTasks.size()); assertEquals(1, zkDiscoveryService.delayedTasks.size());
verify(partitionService, never()).recalculatePartitions(eq(currentInfo), any()); verify(partitionService, never()).recalculatePartitions(any(), any());
startNode(childData); startNode(childData);
verify(partitionService, never()).recalculatePartitions(eq(currentInfo), any()); verify(partitionService, never()).recalculatePartitions(any(), any());
Thread.sleep(2000); Thread.sleep(RECALCULATE_DELAY * 2);
verify(partitionService, never()).recalculatePartitions(eq(currentInfo), any()); verify(partitionService, never()).recalculatePartitions(any(), any());
assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); assertTrue(zkDiscoveryService.delayedTasks.isEmpty());
}
@Test
public void restartNodeNotInTimeTest() throws Exception {
startNode(childData);
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo)));
reset(partitionService);
//Restart not in time
stopNode(childData); stopNode(childData);
assertEquals(1, zkDiscoveryService.delayedTasks.size()); assertEquals(1, zkDiscoveryService.delayedTasks.size());
Thread.sleep(2000); Thread.sleep(RECALCULATE_DELAY * 2);
assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); assertTrue(zkDiscoveryService.delayedTasks.isEmpty());
@ -135,11 +143,19 @@ public class ZkDiscoveryServiceTest {
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo)));
reset(partitionService); reset(partitionService);
}
//Start another node during restart @Test
var anotherInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("anotherId").build(); public void startAnotherNodeDuringRestartTest() throws Exception {
var anotherInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-transport").build();
var anotherData = new ChildData("/thingsboard/nodes/0000000030", null, anotherInfo.toByteArray()); var anotherData = new ChildData("/thingsboard/nodes/0000000030", null, anotherInfo.toByteArray());
startNode(childData);
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo)));
reset(partitionService);
stopNode(childData); stopNode(childData);
assertEquals(1, zkDiscoveryService.delayedTasks.size()); assertEquals(1, zkDiscoveryService.delayedTasks.size());
@ -151,9 +167,9 @@ public class ZkDiscoveryServiceTest {
verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(anotherInfo))); verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(anotherInfo)));
reset(partitionService); reset(partitionService);
Thread.sleep(2000); Thread.sleep(RECALCULATE_DELAY * 2);
verify(partitionService, never()).recalculatePartitions(eq(currentInfo), any()); verify(partitionService, never()).recalculatePartitions(any(), any());
startNode(childData); startNode(childData);

View File

@ -41,7 +41,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem' # Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}"
queue: queue:
type: "${TB_QUEUE_TYPE:kafka}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) type: "${TB_QUEUE_TYPE:kafka}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)

View File

@ -41,7 +41,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem' # Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}"
cache: cache:
type: "${CACHE_TYPE:redis}" type: "${CACHE_TYPE:redis}"

View File

@ -68,7 +68,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem' # Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}"
cache: cache:
type: "${CACHE_TYPE:redis}" type: "${CACHE_TYPE:redis}"

View File

@ -41,7 +41,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem' # Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}"
cache: cache:
type: "${CACHE_TYPE:redis}" type: "${CACHE_TYPE:redis}"

View File

@ -41,7 +41,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem' # Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}"
cache: cache:
type: "${CACHE_TYPE:redis}" type: "${CACHE_TYPE:redis}"

View File

@ -41,7 +41,7 @@ zk:
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem' # Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}"
cache: cache:
type: "${CACHE_TYPE:redis}" type: "${CACHE_TYPE:redis}"