Fixed transport
This commit is contained in:
parent
b5bedbfdf6
commit
a779839081
@ -32,6 +32,7 @@ import lombok.Setter;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Lazy;
|
||||||
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
@ -233,6 +234,7 @@ public class ActorSystemContext {
|
|||||||
/**
|
/**
|
||||||
* The following Service will be null if we operate in tb-core mode
|
* The following Service will be null if we operate in tb-core mode
|
||||||
*/
|
*/
|
||||||
|
@Lazy
|
||||||
@Autowired(required = false)
|
@Autowired(required = false)
|
||||||
@Getter
|
@Getter
|
||||||
private TbRuleEngineDeviceRpcService tbRuleEngineDeviceRpcService;
|
private TbRuleEngineDeviceRpcService tbRuleEngineDeviceRpcService;
|
||||||
@ -240,6 +242,7 @@ public class ActorSystemContext {
|
|||||||
/**
|
/**
|
||||||
* The following Service will be null if we operate in tb-rule-engine mode
|
* The following Service will be null if we operate in tb-rule-engine mode
|
||||||
*/
|
*/
|
||||||
|
@Lazy
|
||||||
@Autowired(required = false)
|
@Autowired(required = false)
|
||||||
@Getter
|
@Getter
|
||||||
private TbCoreDeviceRpcService tbCoreDeviceRpcService;
|
private TbCoreDeviceRpcService tbCoreDeviceRpcService;
|
||||||
|
|||||||
@ -544,6 +544,9 @@ public class DefaultTransportService implements TransportService {
|
|||||||
|
|
||||||
protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) {
|
protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) {
|
||||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, getTenantId(sessionInfo), getDeviceId(sessionInfo));
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, getTenantId(sessionInfo), getDeviceId(sessionInfo));
|
||||||
|
if (log.isTraceEnabled()) {
|
||||||
|
log.trace("[{}][{}] Pushing to topic {} message {}", getTenantId(sessionInfo), getDeviceId(sessionInfo), tpi.getFullTopicName(), toDeviceActorMsg);
|
||||||
|
}
|
||||||
tbCoreMsgProducer.send(tpi,
|
tbCoreMsgProducer.send(tpi,
|
||||||
new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),
|
new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),
|
||||||
ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), callback != null ?
|
ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), callback != null ?
|
||||||
@ -552,6 +555,9 @@ public class DefaultTransportService implements TransportService {
|
|||||||
|
|
||||||
protected void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
|
protected void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
|
||||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tbMsg.getOriginator());
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tbMsg.getOriginator());
|
||||||
|
if (log.isTraceEnabled()) {
|
||||||
|
log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg);
|
||||||
|
}
|
||||||
ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg))
|
ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg))
|
||||||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
|
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
|
||||||
|
|||||||
@ -1,3 +1,5 @@
|
|||||||
|
ZOOKEEPER_ENABLED=true
|
||||||
|
ZOOKEEPER_URL=zookeeper:2181
|
||||||
|
|
||||||
COAP_BIND_ADDRESS=0.0.0.0
|
COAP_BIND_ADDRESS=0.0.0.0
|
||||||
COAP_BIND_PORT=5683
|
COAP_BIND_PORT=5683
|
||||||
|
|||||||
@ -1,3 +1,5 @@
|
|||||||
|
ZOOKEEPER_ENABLED=true
|
||||||
|
ZOOKEEPER_URL=zookeeper:2181
|
||||||
|
|
||||||
HTTP_BIND_ADDRESS=0.0.0.0
|
HTTP_BIND_ADDRESS=0.0.0.0
|
||||||
HTTP_BIND_PORT=8081
|
HTTP_BIND_PORT=8081
|
||||||
|
|||||||
@ -1,3 +1,5 @@
|
|||||||
|
ZOOKEEPER_ENABLED=true
|
||||||
|
ZOOKEEPER_URL=zookeeper:2181
|
||||||
|
|
||||||
MQTT_BIND_ADDRESS=0.0.0.0
|
MQTT_BIND_ADDRESS=0.0.0.0
|
||||||
MQTT_BIND_PORT=1883
|
MQTT_BIND_PORT=1883
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user