From 7a764778e013519edf8a4e90d23a4b1a4def4e23 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Thu, 18 Oct 2018 16:23:35 +0300 Subject: [PATCH] Fixed consumer id uniquness --- .../service/cluster/discovery/DiscoveryService.java | 2 -- .../service/cluster/discovery/DummyDiscoveryService.java | 8 -------- .../service/cluster/discovery/ZkDiscoveryService.java | 8 -------- .../server/service/script/RemoteJsInvokeService.java | 7 ++++--- .../transport/RemoteRuleEngineTransportService.java | 5 +++-- 5 files changed, 7 insertions(+), 23 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java index 04c4cedabd..f9caafa412 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java @@ -26,8 +26,6 @@ public interface DiscoveryService { void unpublishCurrentServer(); - String getNodeId(); - ServerInstance getCurrentServer(); List getOtherServers(); diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java index 859a7af4fc..358c8477d8 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java @@ -38,17 +38,9 @@ public class DummyDiscoveryService implements DiscoveryService { @Autowired private ServerInstanceService serverInstance; - private String nodeId; - @PostConstruct public void init() { log.info("Initializing..."); - this.nodeId = RandomStringUtils.randomAlphabetic(10); - } - - @Override - public String getNodeId() { - return nodeId; } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java index b4829f27aa..bb92642970 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java @@ -94,13 +94,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi private CuratorFramework client; private PathChildrenCache cache; private String nodePath; - //TODO: make persistent? - private String nodeId; @PostConstruct public void init() { log.info("Initializing..."); - this.nodeId = RandomStringUtils.randomAlphabetic(10); Assert.hasLength(zkUrl, MiscUtils.missingProperty("zk.url")); Assert.notNull(zkRetryInterval, MiscUtils.missingProperty("zk.retry_interval_ms")); Assert.notNull(zkConnectionTimeout, MiscUtils.missingProperty("zk.connection_timeout_ms")); @@ -183,11 +180,6 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi } } - @Override - public String getNodeId() { - return nodeId; - } - @Override public ServerInstance getCurrentServer() { return serverInstance.getSelf(); diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index b334ff15ae..db31bda942 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -28,6 +28,7 @@ import org.thingsboard.server.kafka.TBKafkaConsumerTemplate; import org.thingsboard.server.kafka.TBKafkaProducerTemplate; import org.thingsboard.server.kafka.TbKafkaRequestTemplate; import org.thingsboard.server.kafka.TbKafkaSettings; +import org.thingsboard.server.kafka.TbNodeIdProvider; import org.thingsboard.server.service.cluster.discovery.DiscoveryService; import javax.annotation.PostConstruct; @@ -42,7 +43,7 @@ import java.util.concurrent.ConcurrentHashMap; public class RemoteJsInvokeService extends AbstractJsInvokeService { @Autowired - private DiscoveryService discoveryService; + private TbNodeIdProvider nodeIdProvider; @Autowired private TbKafkaSettings kafkaSettings; @@ -97,8 +98,8 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder responseBuilder = TBKafkaConsumerTemplate.builder(); responseBuilder.settings(kafkaSettings); - responseBuilder.topic(responseTopicPrefix + "." + discoveryService.getNodeId()); - responseBuilder.clientId(discoveryService.getNodeId()); + responseBuilder.topic(responseTopicPrefix + "." + nodeIdProvider.getNodeId()); + responseBuilder.clientId("js-" + nodeIdProvider.getNodeId()); responseBuilder.groupId("rule-engine-node"); responseBuilder.autoCommit(true); responseBuilder.autoCommitIntervalMs(autoCommitInterval); diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java index 4f8860daf7..38db1a415b 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java @@ -35,6 +35,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceAct import org.thingsboard.server.kafka.TBKafkaConsumerTemplate; import org.thingsboard.server.kafka.TBKafkaProducerTemplate; import org.thingsboard.server.kafka.TbKafkaSettings; +import org.thingsboard.server.kafka.TbNodeIdProvider; import org.thingsboard.server.service.cluster.discovery.DiscoveryService; import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; import org.thingsboard.server.service.cluster.rpc.ClusterRpcService; @@ -71,7 +72,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ private TbKafkaSettings kafkaSettings; @Autowired - private DiscoveryService discoveryService; + private TbNodeIdProvider nodeIdProvider; @Autowired private ActorSystemContext actorContext; @@ -104,7 +105,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder ruleEngineConsumerBuilder = TBKafkaConsumerTemplate.builder(); ruleEngineConsumerBuilder.settings(kafkaSettings); ruleEngineConsumerBuilder.topic(ruleEngineTopic); - ruleEngineConsumerBuilder.clientId(discoveryService.getNodeId()); + ruleEngineConsumerBuilder.clientId("transport-" + nodeIdProvider.getNodeId()); ruleEngineConsumerBuilder.groupId("tb-node"); ruleEngineConsumerBuilder.autoCommit(true); ruleEngineConsumerBuilder.autoCommitIntervalMs(autoCommitInterval);