diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 19804c0588..1f16fbc414 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -96,7 +96,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" cluster: stats: diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java index 50378d3387..44999d016a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ZkDiscoveryService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.queue.discovery; import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.ProtocolStringList; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; @@ -68,7 +69,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi private Integer zkSessionTimeout; @Value("${zk.zk_dir}") private String zkDir; - @Value("${zk.recalculate_delay:120000}") + @Value("${zk.recalculate_delay:60000}") private Long recalculateDelay; protected final ConcurrentHashMap> delayedTasks; @@ -294,35 +295,39 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi log.error("Failed to decode server instance for node {}", data.getPath(), e); throw e; } - log.debug("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), instance.getServiceId()); + + String serviceId = instance.getServiceId(); + ProtocolStringList serviceTypesList = instance.getServiceTypesList(); + + log.trace("Processing [{}] event for [{}]", pathChildrenCacheEvent.getType(), serviceId); switch (pathChildrenCacheEvent.getType()) { case CHILD_ADDED: - ScheduledFuture task = delayedTasks.remove(instance.getServiceId()); + ScheduledFuture task = delayedTasks.remove(serviceId); if (task != null) { if (task.cancel(false)) { log.debug("[{}] Recalculate partitions ignored. Service was restarted in time [{}].", - instance.getServiceId(), instance.getServiceTypesList()); + serviceId, serviceTypesList); } else { log.debug("[{}] Going to recalculate partitions. Service was not restarted in time [{}]!", - instance.getServiceId(), instance.getServiceTypesList()); + serviceId, serviceTypesList); recalculatePartitions(); } } else { - log.debug("[{}] Going to recalculate partitions due to adding new node [{}].", - instance.getServiceId(), instance.getServiceTypesList()); + log.trace("[{}] Going to recalculate partitions due to adding new node [{}].", + serviceId, serviceTypesList); recalculatePartitions(); } break; case CHILD_REMOVED: ScheduledFuture future = zkExecutorService.schedule(() -> { log.debug("[{}] Going to recalculate partitions due to removed node [{}]", - instance.getServiceId(), instance.getServiceTypesList()); - ScheduledFuture removedTask = delayedTasks.remove(instance.getServiceId()); + serviceId, serviceTypesList); + ScheduledFuture removedTask = delayedTasks.remove(serviceId); if (removedTask != null) { recalculatePartitions(); } }, recalculateDelay, TimeUnit.MILLISECONDS); - delayedTasks.put(instance.getServiceId(), future); + delayedTasks.put(serviceId, future); break; default: break; @@ -334,6 +339,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi * Synchronized to ensure that other servers info is up to date * */ synchronized void recalculatePartitions() { + delayedTasks.values().forEach(future -> future.cancel(false)); delayedTasks.clear(); partitionService.recalculatePartitions(serviceInfoProvider.getServiceInfo(), getOtherServers()); } diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java index 38cad217aa..a8810efd0e 100644 --- a/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java +++ b/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java @@ -63,68 +63,76 @@ public class ZkDiscoveryServiceTest { @Mock private PathChildrenCache cache; - private ScheduledExecutorService zkExecutorService; - @Mock private CuratorFramework curatorFramework; private ZkDiscoveryService zkDiscoveryService; + private static final long RECALCULATE_DELAY = 100L; + + final TransportProtos.ServiceInfo currentInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-rule-engine-0").build(); + final ChildData currentData = new ChildData("/thingsboard/nodes/0000000010", null, currentInfo.toByteArray()); + final TransportProtos.ServiceInfo childInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-rule-engine-1").build(); + final ChildData childData = new ChildData("/thingsboard/nodes/0000000020", null, childInfo.toByteArray()); + @Before public void setup() { zkDiscoveryService = Mockito.spy(new ZkDiscoveryService(serviceInfoProvider, partitionService)); - zkExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("zk-discovery")); + ScheduledExecutorService zkExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("zk-discovery")); when(client.getState()).thenReturn(CuratorFrameworkState.STARTED); ReflectionTestUtils.setField(zkDiscoveryService, "stopped", false); ReflectionTestUtils.setField(zkDiscoveryService, "client", client); ReflectionTestUtils.setField(zkDiscoveryService, "cache", cache); ReflectionTestUtils.setField(zkDiscoveryService, "nodePath", "/thingsboard/nodes/0000000010"); ReflectionTestUtils.setField(zkDiscoveryService, "zkExecutorService", zkExecutorService); - ReflectionTestUtils.setField(zkDiscoveryService, "recalculateDelay", 1000L); + ReflectionTestUtils.setField(zkDiscoveryService, "recalculateDelay", RECALCULATE_DELAY); ReflectionTestUtils.setField(zkDiscoveryService, "zkDir", "/thingsboard"); - } - - @Test - public void restartNodeTest() throws Exception { - var currentInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("currentId").build(); - var currentData = new ChildData("/thingsboard/nodes/0000000010", null, currentInfo.toByteArray()); - var childInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("childId").build(); - var childData = new ChildData("/thingsboard/nodes/0000000020", null, childInfo.toByteArray()); when(serviceInfoProvider.getServiceInfo()).thenReturn(currentInfo); + List dataList = new ArrayList<>(); dataList.add(currentData); when(cache.getCurrentData()).thenReturn(dataList); + } + @Test + public void restartNodeInTimeTest() throws Exception { startNode(childData); verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); reset(partitionService); - //Restart in timeAssert.assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); stopNode(childData); assertEquals(1, zkDiscoveryService.delayedTasks.size()); - verify(partitionService, never()).recalculatePartitions(eq(currentInfo), any()); + verify(partitionService, never()).recalculatePartitions(any(), any()); startNode(childData); - verify(partitionService, never()).recalculatePartitions(eq(currentInfo), any()); + verify(partitionService, never()).recalculatePartitions(any(), any()); - Thread.sleep(2000); + Thread.sleep(RECALCULATE_DELAY * 2); - verify(partitionService, never()).recalculatePartitions(eq(currentInfo), any()); + verify(partitionService, never()).recalculatePartitions(any(), any()); assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); + } + + @Test + public void restartNodeNotInTimeTest() throws Exception { + startNode(childData); + + verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); + + reset(partitionService); - //Restart not in time stopNode(childData); assertEquals(1, zkDiscoveryService.delayedTasks.size()); - Thread.sleep(2000); + Thread.sleep(RECALCULATE_DELAY * 2); assertTrue(zkDiscoveryService.delayedTasks.isEmpty()); @@ -135,11 +143,19 @@ public class ZkDiscoveryServiceTest { verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); reset(partitionService); + } - //Start another node during restart - var anotherInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("anotherId").build(); + @Test + public void startAnotherNodeDuringRestartTest() throws Exception { + var anotherInfo = TransportProtos.ServiceInfo.newBuilder().setServiceId("tb-transport").build(); var anotherData = new ChildData("/thingsboard/nodes/0000000030", null, anotherInfo.toByteArray()); + startNode(childData); + + verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(childInfo))); + + reset(partitionService); + stopNode(childData); assertEquals(1, zkDiscoveryService.delayedTasks.size()); @@ -151,9 +167,9 @@ public class ZkDiscoveryServiceTest { verify(partitionService, times(1)).recalculatePartitions(eq(currentInfo), eq(List.of(anotherInfo))); reset(partitionService); - Thread.sleep(2000); + Thread.sleep(RECALCULATE_DELAY * 2); - verify(partitionService, never()).recalculatePartitions(eq(currentInfo), any()); + verify(partitionService, never()).recalculatePartitions(any(), any()); startNode(childData); diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 0dbb19a71a..66c6b4d3da 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" queue: type: "${TB_QUEUE_TYPE:kafka}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index c8f4b5a099..f4b5e0bc94 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index fe181f12f2..f92da86b99 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -68,7 +68,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index d80279f582..05388473f0 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index fcbf542287..e131788929 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" cache: type: "${CACHE_TYPE:redis}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 0e84d54fce..a7928eb49f 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -41,7 +41,7 @@ zk: session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" # Name of the directory in zookeeper 'filesystem' zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" - recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:120000}" + recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:60000}" cache: type: "${CACHE_TYPE:redis}"