Merge pull request #919 from janbols/master

RpcBroadcastMsg never arrives at other cluster nodes
This commit is contained in:
Andrew Shvayka 2018-07-09 17:41:46 +03:00 committed by GitHub
commit d32cc45093
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -29,11 +29,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos; import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.discovery.ServerInstance; import org.thingsboard.server.service.cluster.discovery.ServerInstance;
import java.util.HashMap; import java.util.*;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
/** /**
* @author Andrew Shvayka * @author Andrew Shvayka
@ -88,7 +84,17 @@ public class RpcManagerActor extends ContextAwareActor {
private void onMsg(RpcBroadcastMsg msg) { private void onMsg(RpcBroadcastMsg msg) {
log.debug("Forwarding msg to session actors {}", msg); log.debug("Forwarding msg to session actors {}", msg);
sessionActors.keySet().forEach(address -> onMsg(msg.getMsg())); sessionActors.keySet().forEach(address -> {
ClusterAPIProtos.ClusterMessage msgWithServerAddress = msg.getMsg()
.toBuilder()
.setServerAddress(ClusterAPIProtos.ServerAddress
.newBuilder()
.setHost(address.getHost())
.setPort(address.getPort())
.build())
.build();
onMsg(msgWithServerAddress);
});
pendingMsgs.values().forEach(queue -> queue.add(msg.getMsg())); pendingMsgs.values().forEach(queue -> queue.add(msg.getMsg()));
} }