Improved logging

This commit is contained in:
Andrew Shvayka 2018-11-01 18:02:09 +02:00
parent 162029939b
commit 5d5100ef1b
20 changed files with 157 additions and 51 deletions

View File

@ -51,7 +51,6 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@Slf4j
public class AppActor extends RuleChainManagerActor { public class AppActor extends RuleChainManagerActor {
private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID); private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);

View File

@ -15,7 +15,6 @@
*/ */
package org.thingsboard.server.actors.device; package org.thingsboard.server.actors.device;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg; import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.ActorSystemContext;
@ -29,7 +28,6 @@ import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg; import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
@Slf4j
public class DeviceActor extends ContextAwareActor { public class DeviceActor extends ContextAwareActor {
private final DeviceActorMessageProcessor processor; private final DeviceActorMessageProcessor processor;

View File

@ -348,9 +348,12 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
int requestId = msg.getMsg().getRequestId(); int requestId = msg.getMsg().getRequestId();
ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(requestId); ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(requestId);
if (data != null) { if (data != null) {
log.debug("[{}] Pushing reply to [{}][{}]!", deviceId, data.getNodeId(), data.getSessionId());
sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder() sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder()
.setRequestId(requestId).setPayload(msg.getMsg().getData()).build() .setRequestId(requestId).setPayload(msg.getMsg().getData()).build()
, data.getSessionId(), data.getNodeId()); , data.getSessionId(), data.getNodeId());
} else {
log.debug("[{}][{}] Pending RPC request to server not found!", deviceId, requestId);
} }
} }

View File

@ -40,7 +40,7 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
@Override @Override
public void onConnected(GrpcSession session) { public void onConnected(GrpcSession session) {
log.info("{} session started -> {}", getType(session), session.getRemoteServer()); log.info("[{}][{}] session started", session.getRemoteServer(), getType(session));
if (!session.isClient()) { if (!session.isClient()) {
manager.tell(new RpcSessionConnectedMsg(session.getRemoteServer(), session.getSessionId()), self); manager.tell(new RpcSessionConnectedMsg(session.getRemoteServer(), session.getSessionId()), self);
} }
@ -48,21 +48,19 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
@Override @Override
public void onDisconnected(GrpcSession session) { public void onDisconnected(GrpcSession session) {
log.info("{} session closed -> {}", getType(session), session.getRemoteServer()); log.info("[{}][{}] session closed", session.getRemoteServer(), getType(session));
manager.tell(new RpcSessionDisconnectedMsg(session.isClient(), session.getRemoteServer()), self); manager.tell(new RpcSessionDisconnectedMsg(session.isClient(), session.getRemoteServer()), self);
} }
@Override @Override
public void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage) { public void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage) {
log.trace("{} Service [{}] received session actor msg {}", getType(session), log.trace("Received session actor msg from [{}][{}]: {}", session.getRemoteServer(), getType(session), clusterMessage);
session.getRemoteServer(),
clusterMessage);
service.onReceivedMsg(session.getRemoteServer(), clusterMessage); service.onReceivedMsg(session.getRemoteServer(), clusterMessage);
} }
@Override @Override
public void onError(GrpcSession session, Throwable t) { public void onError(GrpcSession session, Throwable t) {
log.warn("{} session got error -> {}", getType(session), session.getRemoteServer(), t); log.warn("[{}][{}] session got error -> {}", session.getRemoteServer(), getType(session), t);
manager.tell(new RpcSessionClosedMsg(session.isClient(), session.getRemoteServer()), self); manager.tell(new RpcSessionClosedMsg(session.isClient(), session.getRemoteServer()), self);
session.close(); session.close();
} }

View File

@ -36,7 +36,6 @@ import java.util.*;
/** /**
* @author Andrew Shvayka * @author Andrew Shvayka
*/ */
@Slf4j
public class RpcManagerActor extends ContextAwareActor { public class RpcManagerActor extends ContextAwareActor {
private final Map<ServerAddress, SessionActorInfo> sessionActors; private final Map<ServerAddress, SessionActorInfo> sessionActors;

View File

@ -69,6 +69,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private RuleNodeId firstId; private RuleNodeId firstId;
private RuleNodeCtx firstNode; private RuleNodeCtx firstNode;
private boolean started; private boolean started;
private String ruleChainName;
RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext
, ActorRef parent, ActorRef self) { , ActorRef parent, ActorRef self) {
@ -78,15 +79,24 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
this.nodeActors = new HashMap<>(); this.nodeActors = new HashMap<>();
this.nodeRoutes = new HashMap<>(); this.nodeRoutes = new HashMap<>();
this.service = systemContext.getRuleChainService(); this.service = systemContext.getRuleChainService();
this.ruleChainName = ruleChainId.toString();
} }
@Override @Override
public void start(ActorContext context) throws Exception { public String getComponentName() {
return null;
}
@Override
public void start(ActorContext context) {
if (!started) { if (!started) {
RuleChain ruleChain = service.findRuleChainById(entityId); RuleChain ruleChain = service.findRuleChainById(entityId);
ruleChainName = ruleChain.getName();
List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId); List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
// Creating and starting the actors; // Creating and starting the actors;
for (RuleNode ruleNode : ruleNodeList) { for (RuleNode ruleNode : ruleNodeList) {
log.trace("[{}][{}] Creating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode);
ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode); ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode)); nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
} }
@ -98,16 +108,19 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
} }
@Override @Override
public void onUpdate(ActorContext context) throws Exception { public void onUpdate(ActorContext context) {
RuleChain ruleChain = service.findRuleChainById(entityId); RuleChain ruleChain = service.findRuleChainById(entityId);
ruleChainName = ruleChain.getName();
List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId); List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
log.trace("[{}][{}] Updating rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
for (RuleNode ruleNode : ruleNodeList) { for (RuleNode ruleNode : ruleNodeList) {
RuleNodeCtx existing = nodeActors.get(ruleNode.getId()); RuleNodeCtx existing = nodeActors.get(ruleNode.getId());
if (existing == null) { if (existing == null) {
log.trace("[{}][{}] Creating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode);
ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode); ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode)); nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
} else { } else {
log.trace("[{}][{}] Updating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode);
existing.setSelf(ruleNode); existing.setSelf(ruleNode);
existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED), self); existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED), self);
} }
@ -116,6 +129,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
Set<RuleNodeId> existingNodes = ruleNodeList.stream().map(RuleNode::getId).collect(Collectors.toSet()); Set<RuleNodeId> existingNodes = ruleNodeList.stream().map(RuleNode::getId).collect(Collectors.toSet());
List<RuleNodeId> removedRules = nodeActors.keySet().stream().filter(node -> !existingNodes.contains(node)).collect(Collectors.toList()); List<RuleNodeId> removedRules = nodeActors.keySet().stream().filter(node -> !existingNodes.contains(node)).collect(Collectors.toList());
removedRules.forEach(ruleNodeId -> { removedRules.forEach(ruleNodeId -> {
log.trace("[{}][{}] Removing rule node [{}]", tenantId, entityId, ruleNodeId);
RuleNodeCtx removed = nodeActors.remove(ruleNodeId); RuleNodeCtx removed = nodeActors.remove(ruleNodeId);
removed.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED), self); removed.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED), self);
}); });
@ -124,7 +138,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
} }
@Override @Override
public void stop(ActorContext context) throws Exception { public void stop(ActorContext context) {
log.trace("[{}][{}] Stopping rule chain with {} nodes", tenantId, entityId, nodeActors.size());
nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(context::stop); nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(context::stop);
nodeActors.clear(); nodeActors.clear();
nodeRoutes.clear(); nodeRoutes.clear();
@ -133,7 +148,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
} }
@Override @Override
public void onClusterEventMsg(ClusterEventMsg msg) throws Exception { public void onClusterEventMsg(ClusterEventMsg msg) {
} }
@ -150,10 +165,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
// Populating the routes map; // Populating the routes map;
for (RuleNode ruleNode : ruleNodeList) { for (RuleNode ruleNode : ruleNodeList) {
List<EntityRelation> relations = service.getRuleNodeRelations(ruleNode.getId()); List<EntityRelation> relations = service.getRuleNodeRelations(ruleNode.getId());
log.trace("[{}][{}][{}] Processing rule node relations [{}]", tenantId, entityId, ruleNode.getId(), relations.size());
if (relations.size() == 0) { if (relations.size() == 0) {
nodeRoutes.put(ruleNode.getId(), Collections.emptyList()); nodeRoutes.put(ruleNode.getId(), Collections.emptyList());
} else { } else {
for (EntityRelation relation : relations) { for (EntityRelation relation : relations) {
log.trace("[{}][{}][{}] Processing rule node relation [{}]", tenantId, entityId, ruleNode.getId(), relation.getTo());
if (relation.getTo().getEntityType() == EntityType.RULE_NODE) { if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId())); RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
if (ruleNodeCtx == null) { if (ruleNodeCtx == null) {
@ -232,17 +249,20 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
int relationsCount = relations.size(); int relationsCount = relations.size();
EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId(); EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
if (relationsCount == 0) { if (relationsCount == 0) {
log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());
if (ackId != null) { if (ackId != null) {
// TODO: Ack this message in Kafka // TODO: Ack this message in Kafka
// queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition()); // queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
} }
} else if (relationsCount == 1) { } else if (relationsCount == 1) {
for (RuleNodeRelation relation : relations) { for (RuleNodeRelation relation : relations) {
log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
pushToTarget(msg, relation.getOut(), relation.getType()); pushToTarget(msg, relation.getOut(), relation.getType());
} }
} else { } else {
for (RuleNodeRelation relation : relations) { for (RuleNodeRelation relation : relations) {
EntityId target = relation.getOut(); EntityId target = relation.getOut();
log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
switch (target.getEntityType()) { switch (target.getEntityType()) {
case RULE_NODE: case RULE_NODE:
enqueueAndForwardMsgCopyToNode(msg, target, relation.getType()); enqueueAndForwardMsgCopyToNode(msg, target, relation.getType());

View File

@ -15,7 +15,6 @@
*/ */
package org.thingsboard.server.actors.ruleChain; package org.thingsboard.server.actors.ruleChain;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ComponentActor; import org.thingsboard.server.actors.service.ComponentActor;
import org.thingsboard.server.actors.service.ContextBasedCreator; import org.thingsboard.server.actors.service.ContextBasedCreator;
@ -25,7 +24,6 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
@Slf4j
public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> { public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> {
private final RuleChainId ruleChainId; private final RuleChainId ruleChainId;
@ -62,7 +60,9 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
} }
private void onRuleNodeToSelfMsg(RuleNodeToSelfMsg msg) { private void onRuleNodeToSelfMsg(RuleNodeToSelfMsg msg) {
log.debug("[{}] Going to process rule msg: {}", id, msg.getMsg()); if (log.isDebugEnabled()) {
log.debug("[{}][{}][{}] Going to process rule msg: {}", ruleChainId, id, processor.getComponentName(), msg.getMsg());
}
try { try {
processor.onRuleToSelfMsg(msg); processor.onRuleToSelfMsg(msg);
increaseMessagesProcessedCount(); increaseMessagesProcessedCount();
@ -72,7 +72,9 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
} }
private void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) { private void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) {
log.debug("[{}] Going to process rule msg: {}", id, msg.getMsg()); if (log.isDebugEnabled()) {
log.debug("[{}][{}][{}] Going to process rule msg: {}", ruleChainId, id, processor.getComponentName(), msg.getMsg());
}
try { try {
processor.onRuleChainToRuleNodeMsg(msg); processor.onRuleChainToRuleNodeMsg(msg);
increaseMessagesProcessedCount(); increaseMessagesProcessedCount();

View File

@ -75,7 +75,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
} }
@Override @Override
public void stop(ActorContext context) throws Exception { public void stop(ActorContext context) {
if (tbNode != null) { if (tbNode != null) {
tbNode.destroy(); tbNode.destroy();
} }
@ -83,7 +83,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
} }
@Override @Override
public void onClusterEventMsg(ClusterEventMsg msg) throws Exception { public void onClusterEventMsg(ClusterEventMsg msg) {
} }
@ -111,6 +111,11 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
} }
} }
@Override
public String getComponentName() {
return ruleNode.getName();
}
private TbNode initComponent(RuleNode ruleNode) throws Exception { private TbNode initComponent(RuleNode ruleNode) throws Exception {
Class<?> componentClazz = Class.forName(ruleNode.getType()); Class<?> componentClazz = Class.forName(ruleNode.getType());
TbNode tbNode = (TbNode) (componentClazz.newInstance()); TbNode tbNode = (TbNode) (componentClazz.newInstance());

View File

@ -31,7 +31,6 @@ import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
/** /**
* @author Andrew Shvayka * @author Andrew Shvayka
*/ */
@Slf4j
public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgProcessor<T>> extends ContextAwareActor { public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgProcessor<T>> extends ContextAwareActor {
private long lastPersistedErrorTs = 0L; private long lastPersistedErrorTs = 0L;
@ -54,6 +53,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
@Override @Override
public void preStart() { public void preStart() {
try { try {
log.debug("[{}][{}][{}] Starting processor.", tenantId, id, id.getEntityType());
processor.start(context()); processor.start(context());
logLifecycleEvent(ComponentLifecycleEvent.STARTED); logLifecycleEvent(ComponentLifecycleEvent.STARTED);
if (systemContext.isStatisticsEnabled()) { if (systemContext.isStatisticsEnabled()) {
@ -78,6 +78,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
@Override @Override
public void postStop() { public void postStop() {
try { try {
log.debug("[{}][{}] Stopping processor.", tenantId, id, id.getEntityType());
processor.stop(context()); processor.stop(context());
logLifecycleEvent(ComponentLifecycleEvent.STOPPED); logLifecycleEvent(ComponentLifecycleEvent.STOPPED);
} catch (Exception e) { } catch (Exception e) {
@ -88,6 +89,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
} }
protected void onComponentLifecycleMsg(ComponentLifecycleMsg msg) { protected void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
log.debug("[{}][{}][{}] onComponentLifecycleMsg: [{}]", tenantId, id, id.getEntityType(), msg.getEvent());
try { try {
switch (msg.getEvent()) { switch (msg.getEvent()) {
case CREATED: case CREATED:
@ -148,9 +150,9 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
private void logAndPersist(String method, Exception e, boolean critical) { private void logAndPersist(String method, Exception e, boolean critical) {
errorsOccurred++; errorsOccurred++;
if (critical) { if (critical) {
log.warn("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e); log.warn("[{}][{}][{}] Failed to process {} msg: {}", id, tenantId, processor.getComponentName(), method, e);
} else { } else {
log.debug("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e); log.debug("[{}][{}][{}] Failed to process {} msg: {}", id, tenantId, processor.getComponentName(), method, e);
} }
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
if (ts - lastPersistedErrorTs > getErrorPersistFrequency()) { if (ts - lastPersistedErrorTs > getErrorPersistFrequency()) {

View File

@ -17,15 +17,16 @@ package org.thingsboard.server.actors.service;
import akka.actor.Terminated; import akka.actor.Terminated;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import akka.event.Logging; import org.slf4j.Logger;
import akka.event.LoggingAdapter; import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbActorMsg;
@Slf4j
public abstract class ContextAwareActor extends UntypedActor { public abstract class ContextAwareActor extends UntypedActor {
protected final Logger log = LoggerFactory.getLogger(getClass());
public static final int ENTITY_PACK_LIMIT = 1024; public static final int ENTITY_PACK_LIMIT = 1024;
protected final ActorSystemContext systemContext; protected final ActorSystemContext systemContext;

View File

@ -44,6 +44,8 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
this.entityId = id; this.entityId = id;
} }
public abstract String getComponentName();
public abstract void start(ActorContext context) throws Exception; public abstract void start(ActorContext context) throws Exception;
public abstract void stop(ActorContext context) throws Exception; public abstract void stop(ActorContext context) throws Exception;

View File

@ -48,7 +48,6 @@ import scala.concurrent.duration.Duration;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@Slf4j
public class TenantActor extends RuleChainManagerActor { public class TenantActor extends RuleChainManagerActor {
private final TenantId tenantId; private final TenantId tenantId;

View File

@ -71,7 +71,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
private int maxErrors; private int maxErrors;
private TbKafkaRequestTemplate<JsInvokeProtos.RemoteJsRequest, JsInvokeProtos.RemoteJsResponse> kafkaTemplate; private TbKafkaRequestTemplate<JsInvokeProtos.RemoteJsRequest, JsInvokeProtos.RemoteJsResponse> kafkaTemplate;
protected Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>(); private Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>();
@PostConstruct @PostConstruct
public void init() { public void init() {
@ -100,7 +100,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
responseBuilder.settings(kafkaSettings); responseBuilder.settings(kafkaSettings);
responseBuilder.topic(responseTopicPrefix + "." + nodeIdProvider.getNodeId()); responseBuilder.topic(responseTopicPrefix + "." + nodeIdProvider.getNodeId());
responseBuilder.clientId("js-" + nodeIdProvider.getNodeId()); responseBuilder.clientId("js-" + nodeIdProvider.getNodeId());
responseBuilder.groupId("rule-engine-node"); responseBuilder.groupId("rule-engine-node-" + nodeIdProvider.getNodeId());
responseBuilder.autoCommit(true); responseBuilder.autoCommit(true);
responseBuilder.autoCommitIntervalMs(autoCommitInterval); responseBuilder.autoCommitIntervalMs(autoCommitInterval);
responseBuilder.decoder(new RemoteJsResponseDecoder()); responseBuilder.decoder(new RemoteJsResponseDecoder());

View File

@ -149,6 +149,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
records.forEach(record -> { records.forEach(record -> {
try { try {
ToRuleEngineMsg toRuleEngineMsg = ruleEngineConsumer.decode(record); ToRuleEngineMsg toRuleEngineMsg = ruleEngineConsumer.decode(record);
log.trace("Forwarding message to rule engine {}", toRuleEngineMsg);
if (toRuleEngineMsg.hasToDeviceActorMsg()) { if (toRuleEngineMsg.hasToDeviceActorMsg()) {
forwardToDeviceActor(toRuleEngineMsg.getToDeviceActorMsg()); forwardToDeviceActor(toRuleEngineMsg.getToDeviceActorMsg());
} }
@ -175,18 +176,21 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
@Override @Override
public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) { public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
notificationsProducer.send(notificationsTopic + "." + nodeId, String topic = notificationsTopic + "." + nodeId;
new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB()).toString(), UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB());
ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build() ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build();
, new QueueCallbackAdaptor(onSuccess, onFailure)); log.trace("[{}][{}] Pushing session data to topic: {}", topic, sessionId, transportMsg);
notificationsProducer.send(topic, sessionId.toString(), transportMsg, new QueueCallbackAdaptor(onSuccess, onFailure));
} }
private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg) { private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg) {
TransportToDeviceActorMsgWrapper wrapper = new TransportToDeviceActorMsgWrapper(toDeviceActorMsg); TransportToDeviceActorMsgWrapper wrapper = new TransportToDeviceActorMsgWrapper(toDeviceActorMsg);
Optional<ServerAddress> address = routingService.resolveById(wrapper.getDeviceId()); Optional<ServerAddress> address = routingService.resolveById(wrapper.getDeviceId());
if (address.isPresent()) { if (address.isPresent()) {
log.trace("[{}] Pushing message to remote server: {}", address.get(), toDeviceActorMsg);
rpcService.tell(encodingService.convertToProtoDataMessage(address.get(), wrapper)); rpcService.tell(encodingService.convertToProtoDataMessage(address.get(), wrapper));
} else { } else {
log.trace("Pushing message to local server: {}", toDeviceActorMsg);
actorContext.getAppActor().tell(wrapper, ActorRef.noSender()); actorContext.getAppActor().tell(wrapper, ActorRef.noSender());
} }
} }

View File

@ -77,9 +77,10 @@ public class TBKafkaProducerTemplate<T> {
result.all().get(); result.all().get();
} catch (Exception e) { } catch (Exception e) {
if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) { if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) {
log.trace("[{}] Topic already exists: ", defaultTopic); log.trace("[{}] Topic already exists.", defaultTopic);
} else { } else {
log.trace("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e); log.info("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e);
throw new RuntimeException(e);
} }
} }
//Maybe this should not be cached, but we don't plan to change size of partitions //Maybe this should not be cached, but we don't plan to change size of partitions

View File

@ -23,6 +23,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeader;
@ -83,7 +86,13 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
CreateTopicsResult result = admin.createTopic(new NewTopic(responseTemplate.getTopic(), 1, (short) 1)); CreateTopicsResult result = admin.createTopic(new NewTopic(responseTemplate.getTopic(), 1, (short) 1));
result.all().get(); result.all().get();
} catch (Exception e) { } catch (Exception e) {
log.trace("Failed to create topic: {}", e.getMessage(), e); if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) {
log.trace("[{}] Topic already exists. ", responseTemplate.getTopic());
} else {
log.info("[{}] Failed to create topic: {}", responseTemplate.getTopic(), e.getMessage(), e);
throw new RuntimeException(e);
}
} }
this.requestTemplate.init(); this.requestTemplate.init();
tickTs = System.currentTimeMillis(); tickTs = System.currentTimeMillis();
@ -93,6 +102,7 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
while (!stopped) { while (!stopped) {
ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval)); ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval));
responses.forEach(response -> { responses.forEach(response -> {
log.trace("Received response to Kafka Template request: {}", response);
Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER); Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
Response decodedResponse = null; Response decodedResponse = null;
UUID requestId = null; UUID requestId = null;
@ -160,7 +170,13 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
SettableFuture<Response> future = SettableFuture.create(); SettableFuture<Response> future = SettableFuture.create();
pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future)); pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future));
request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId); request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId);
requestTemplate.send(key, request, headers, null); requestTemplate.send(key, request, headers, (metadata, exception) -> {
if (exception != null) {
log.trace("[{}] Failed to post the request", requestId, exception);
} else {
log.trace("[{}] Posted the request", requestId, metadata);
}
});
return future; return future;
} }

View File

@ -19,3 +19,5 @@ As result, in REPOSITORY column, next images should be present:
- Run the black box tests in the [msa/black-box-tests](../black-box-tests) directory: - Run the black box tests in the [msa/black-box-tests](../black-box-tests) directory:
mvn clean install -DblackBoxTests.skip=false mvn clean install -DblackBoxTests.skip=false

View File

@ -33,6 +33,9 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts; import org.apache.http.ssl.SSLContexts;
import org.junit.*; import org.junit.*;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.thingsboard.client.tools.RestClient; import org.thingsboard.client.tools.RestClient;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
@ -60,6 +63,33 @@ public abstract class AbstractContainerTest {
restClient.getRestTemplate().setRequestFactory(getRequestFactoryForSelfSignedCert()); restClient.getRestTemplate().setRequestFactory(getRequestFactoryForSelfSignedCert());
} }
@Rule
public TestRule watcher = new TestWatcher() {
protected void starting(Description description) {
log.info("=================================================");
log.info("STARTING TEST: {}" , description.getMethodName());
log.info("=================================================");
}
/**
* Invoked when a test succeeds
*/
protected void succeeded(Description description) {
log.info("=================================================");
log.info("SUCCEEDED TEST: {}" , description.getMethodName());
log.info("=================================================");
}
/**
* Invoked when a test fails
*/
protected void failed(Throwable e, Description description) {
log.info("=================================================");
log.info("FAILED TEST: {}" , description.getMethodName(), e);
log.info("=================================================");
}
};
protected Device createDevice(String name) { protected Device createDevice(String name) {
return restClient.createDevice(name + RandomStringUtils.randomAlphanumeric(7), "DEFAULT"); return restClient.createDevice(name + RandomStringUtils.randomAlphanumeric(7), "DEFAULT");
} }
@ -82,6 +112,7 @@ public abstract class AbstractContainerTest {
JsonObject wsRequest = new JsonObject(); JsonObject wsRequest = new JsonObject();
wsRequest.add(property.toString(), cmd); wsRequest.add(property.toString(), cmd);
wsClient.send(wsRequest.toString()); wsClient.send(wsRequest.toString());
wsClient.waitForFirstReply();
return wsClient; return wsClient;
} }

View File

@ -31,9 +31,11 @@ public class WsClient extends WebSocketClient {
private static final ObjectMapper mapper = new ObjectMapper(); private static final ObjectMapper mapper = new ObjectMapper();
private WsTelemetryResponse message; private WsTelemetryResponse message;
private CountDownLatch latch = new CountDownLatch(1);; private volatile boolean firstReplyReceived;
private CountDownLatch firstReply = new CountDownLatch(1);
private CountDownLatch latch = new CountDownLatch(1);
public WsClient(URI serverUri) { WsClient(URI serverUri) {
super(serverUri); super(serverUri);
} }
@ -43,6 +45,10 @@ public class WsClient extends WebSocketClient {
@Override @Override
public void onMessage(String message) { public void onMessage(String message) {
if (!firstReplyReceived) {
firstReplyReceived = true;
firstReply.countDown();
} else {
try { try {
WsTelemetryResponse response = mapper.readValue(message, WsTelemetryResponse.class); WsTelemetryResponse response = mapper.readValue(message, WsTelemetryResponse.class);
if (!response.getData().isEmpty()) { if (!response.getData().isEmpty()) {
@ -53,6 +59,7 @@ public class WsClient extends WebSocketClient {
log.error("ws message can't be read"); log.error("ws message can't be read");
} }
} }
}
@Override @Override
public void onClose(int code, String reason, boolean remote) { public void onClose(int code, String reason, boolean remote) {
@ -73,4 +80,13 @@ public class WsClient extends WebSocketClient {
} }
return null; return null;
} }
void waitForFirstReply() {
try {
firstReply.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("Timeout, ws message wasn't received");
throw new RuntimeException(e);
}
}
} }

View File

@ -28,6 +28,9 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.junit.*; import org.junit.*;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
@ -65,6 +68,7 @@ public class MqttClientTest extends AbstractContainerTest {
MqttClient mqttClient = getMqttClient(deviceCredentials, null); MqttClient mqttClient = getMqttClient(deviceCredentials, null);
mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload().toString().getBytes())); mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload().toString().getBytes()));
WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(); WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
log.info("Received telemetry: {}", actualLatestTelemetry);
wsClient.closeBlocking(); wsClient.closeBlocking();
Assert.assertEquals(4, actualLatestTelemetry.getData().size()); Assert.assertEquals(4, actualLatestTelemetry.getData().size());
@ -91,6 +95,7 @@ public class MqttClientTest extends AbstractContainerTest {
MqttClient mqttClient = getMqttClient(deviceCredentials, null); MqttClient mqttClient = getMqttClient(deviceCredentials, null);
mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload(ts).toString().getBytes())); mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload(ts).toString().getBytes()));
WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(); WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
log.info("Received telemetry: {}", actualLatestTelemetry);
wsClient.closeBlocking(); wsClient.closeBlocking();
Assert.assertEquals(4, actualLatestTelemetry.getData().size()); Assert.assertEquals(4, actualLatestTelemetry.getData().size());
@ -120,6 +125,7 @@ public class MqttClientTest extends AbstractContainerTest {
clientAttributes.addProperty("attr4", 73); clientAttributes.addProperty("attr4", 73);
mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes())); mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes()));
WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(); WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
log.info("Received telemetry: {}", actualLatestTelemetry);
wsClient.closeBlocking(); wsClient.closeBlocking();
Assert.assertEquals(4, actualLatestTelemetry.getData().size()); Assert.assertEquals(4, actualLatestTelemetry.getData().size());
@ -168,6 +174,7 @@ public class MqttClientTest extends AbstractContainerTest {
mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes())); mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes()));
MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS); MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS);
AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class); AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class);
log.info("Received telemetry: {}", attributes);
Assert.assertEquals(1, attributes.getClient().size()); Assert.assertEquals(1, attributes.getClient().size());
Assert.assertEquals(clientAttributeValue, attributes.getClient().get("clientAttr")); Assert.assertEquals(clientAttributeValue, attributes.getClient().get("clientAttr"));
@ -281,6 +288,7 @@ public class MqttClientTest extends AbstractContainerTest {
// Create a new root rule chain // Create a new root rule chain
RuleChainId ruleChainId = createRootRuleChainForRpcResponse(); RuleChainId ruleChainId = createRootRuleChainForRpcResponse();
TimeUnit.SECONDS.sleep(3);
// Send the request to the server // Send the request to the server
JsonObject clientRequest = new JsonObject(); JsonObject clientRequest = new JsonObject();
clientRequest.addProperty("method", "getResponse"); clientRequest.addProperty("method", "getResponse");
@ -360,12 +368,12 @@ public class MqttClientTest extends AbstractContainerTest {
return defaultRuleChain.get().getId(); return defaultRuleChain.get().getId();
} }
private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException { private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException {
MqttClientConfig clientConfig = new MqttClientConfig(); MqttClientConfig clientConfig = new MqttClientConfig();
clientConfig.setClientId("MQTT client from test"); clientConfig.setClientId("MQTT client from test");
clientConfig.setUsername(deviceCredentials.getCredentialsId()); clientConfig.setUsername(deviceCredentials.getCredentialsId());
MqttClient mqttClient = MqttClient.create(clientConfig, listener); MqttClient mqttClient = MqttClient.create(clientConfig, listener);
mqttClient.connect("localhost", 1883).sync(); mqttClient.connect("localhost", 1883).get();
return mqttClient; return mqttClient;
} }