Fixed consumer id uniquness
This commit is contained in:
parent
0ae28408c9
commit
7a764778e0
@ -26,8 +26,6 @@ public interface DiscoveryService {
|
|||||||
|
|
||||||
void unpublishCurrentServer();
|
void unpublishCurrentServer();
|
||||||
|
|
||||||
String getNodeId();
|
|
||||||
|
|
||||||
ServerInstance getCurrentServer();
|
ServerInstance getCurrentServer();
|
||||||
|
|
||||||
List<ServerInstance> getOtherServers();
|
List<ServerInstance> getOtherServers();
|
||||||
|
|||||||
@ -38,17 +38,9 @@ public class DummyDiscoveryService implements DiscoveryService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private ServerInstanceService serverInstance;
|
private ServerInstanceService serverInstance;
|
||||||
|
|
||||||
private String nodeId;
|
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
log.info("Initializing...");
|
log.info("Initializing...");
|
||||||
this.nodeId = RandomStringUtils.randomAlphabetic(10);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getNodeId() {
|
|
||||||
return nodeId;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -94,13 +94,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
|
|||||||
private CuratorFramework client;
|
private CuratorFramework client;
|
||||||
private PathChildrenCache cache;
|
private PathChildrenCache cache;
|
||||||
private String nodePath;
|
private String nodePath;
|
||||||
//TODO: make persistent?
|
|
||||||
private String nodeId;
|
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
log.info("Initializing...");
|
log.info("Initializing...");
|
||||||
this.nodeId = RandomStringUtils.randomAlphabetic(10);
|
|
||||||
Assert.hasLength(zkUrl, MiscUtils.missingProperty("zk.url"));
|
Assert.hasLength(zkUrl, MiscUtils.missingProperty("zk.url"));
|
||||||
Assert.notNull(zkRetryInterval, MiscUtils.missingProperty("zk.retry_interval_ms"));
|
Assert.notNull(zkRetryInterval, MiscUtils.missingProperty("zk.retry_interval_ms"));
|
||||||
Assert.notNull(zkConnectionTimeout, MiscUtils.missingProperty("zk.connection_timeout_ms"));
|
Assert.notNull(zkConnectionTimeout, MiscUtils.missingProperty("zk.connection_timeout_ms"));
|
||||||
@ -183,11 +180,6 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getNodeId() {
|
|
||||||
return nodeId;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ServerInstance getCurrentServer() {
|
public ServerInstance getCurrentServer() {
|
||||||
return serverInstance.getSelf();
|
return serverInstance.getSelf();
|
||||||
|
|||||||
@ -28,6 +28,7 @@ import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
|
|||||||
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
|
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
|
||||||
import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
|
import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
|
||||||
import org.thingsboard.server.kafka.TbKafkaSettings;
|
import org.thingsboard.server.kafka.TbKafkaSettings;
|
||||||
|
import org.thingsboard.server.kafka.TbNodeIdProvider;
|
||||||
import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
|
import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
@ -42,7 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||||||
public class RemoteJsInvokeService extends AbstractJsInvokeService {
|
public class RemoteJsInvokeService extends AbstractJsInvokeService {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private DiscoveryService discoveryService;
|
private TbNodeIdProvider nodeIdProvider;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private TbKafkaSettings kafkaSettings;
|
private TbKafkaSettings kafkaSettings;
|
||||||
@ -97,8 +98,8 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
|
|||||||
|
|
||||||
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<JsInvokeProtos.RemoteJsResponse> responseBuilder = TBKafkaConsumerTemplate.builder();
|
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<JsInvokeProtos.RemoteJsResponse> responseBuilder = TBKafkaConsumerTemplate.builder();
|
||||||
responseBuilder.settings(kafkaSettings);
|
responseBuilder.settings(kafkaSettings);
|
||||||
responseBuilder.topic(responseTopicPrefix + "." + discoveryService.getNodeId());
|
responseBuilder.topic(responseTopicPrefix + "." + nodeIdProvider.getNodeId());
|
||||||
responseBuilder.clientId(discoveryService.getNodeId());
|
responseBuilder.clientId("js-" + nodeIdProvider.getNodeId());
|
||||||
responseBuilder.groupId("rule-engine-node");
|
responseBuilder.groupId("rule-engine-node");
|
||||||
responseBuilder.autoCommit(true);
|
responseBuilder.autoCommit(true);
|
||||||
responseBuilder.autoCommitIntervalMs(autoCommitInterval);
|
responseBuilder.autoCommitIntervalMs(autoCommitInterval);
|
||||||
|
|||||||
@ -35,6 +35,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceAct
|
|||||||
import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
|
import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
|
||||||
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
|
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
|
||||||
import org.thingsboard.server.kafka.TbKafkaSettings;
|
import org.thingsboard.server.kafka.TbKafkaSettings;
|
||||||
|
import org.thingsboard.server.kafka.TbNodeIdProvider;
|
||||||
import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
|
import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
|
||||||
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
|
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
|
||||||
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
|
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
|
||||||
@ -71,7 +72,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
|
|||||||
private TbKafkaSettings kafkaSettings;
|
private TbKafkaSettings kafkaSettings;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private DiscoveryService discoveryService;
|
private TbNodeIdProvider nodeIdProvider;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private ActorSystemContext actorContext;
|
private ActorSystemContext actorContext;
|
||||||
@ -104,7 +105,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
|
|||||||
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<ToRuleEngineMsg> ruleEngineConsumerBuilder = TBKafkaConsumerTemplate.builder();
|
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<ToRuleEngineMsg> ruleEngineConsumerBuilder = TBKafkaConsumerTemplate.builder();
|
||||||
ruleEngineConsumerBuilder.settings(kafkaSettings);
|
ruleEngineConsumerBuilder.settings(kafkaSettings);
|
||||||
ruleEngineConsumerBuilder.topic(ruleEngineTopic);
|
ruleEngineConsumerBuilder.topic(ruleEngineTopic);
|
||||||
ruleEngineConsumerBuilder.clientId(discoveryService.getNodeId());
|
ruleEngineConsumerBuilder.clientId("transport-" + nodeIdProvider.getNodeId());
|
||||||
ruleEngineConsumerBuilder.groupId("tb-node");
|
ruleEngineConsumerBuilder.groupId("tb-node");
|
||||||
ruleEngineConsumerBuilder.autoCommit(true);
|
ruleEngineConsumerBuilder.autoCommit(true);
|
||||||
ruleEngineConsumerBuilder.autoCommitIntervalMs(autoCommitInterval);
|
ruleEngineConsumerBuilder.autoCommitIntervalMs(autoCommitInterval);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user