Statistics Implementation

This commit is contained in:
Andrii Shvaika 2020-04-08 14:09:56 +03:00
parent ff3fd89ace
commit 3eaae1ef32
66 changed files with 1014 additions and 758 deletions

View File

@ -24,10 +24,9 @@ import akka.actor.Terminated;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.rulechain.SystemRuleChainManager;
import org.thingsboard.server.actors.tenant.TenantActor;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.Tenant;
@ -37,16 +36,14 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.tenant.TenantService;
import scala.concurrent.duration.Duration;
public class AppActor extends RuleChainManagerActor {
public class AppActor extends ContextAwareActor {
private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
private final TenantService tenantService;
@ -54,7 +51,7 @@ public class AppActor extends RuleChainManagerActor {
private boolean ruleChainsInitialized;
private AppActor(ActorSystemContext systemContext) {
super(systemContext, new SystemRuleChainManager(systemContext));
super(systemContext);
this.tenantService = systemContext.getTenantService();
this.tenantActors = HashBiMap.create();
}
@ -80,9 +77,6 @@ public class AppActor extends RuleChainManagerActor {
switch (msg.getMsgType()) {
case APP_INIT_MSG:
break;
case SEND_TO_CLUSTER_MSG:
onPossibleClusterMsg((SendToClusterMsg) msg);
break;
case PARTITION_CHANGE_MSG:
broadcast(msg);
break;
@ -98,7 +92,6 @@ public class AppActor extends RuleChainManagerActor {
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
onToDeviceActorMsg((TenantAwareMsg) msg);
break;
default:
@ -110,7 +103,6 @@ public class AppActor extends RuleChainManagerActor {
private void initRuleChainsAndTenantActors() {
log.info("Starting main system actor.");
try {
initRuleChains();
if (systemContext.isTenantComponentsInitEnabled()) {
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
for (Tenant tenant : tenantIterator) {
@ -125,37 +117,22 @@ public class AppActor extends RuleChainManagerActor {
}
}
private void onPossibleClusterMsg(SendToClusterMsg msg) {
//TODO 2.5
// Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(msg.getEntityId());
// if (address.isPresent()) {
// systemContext.getRpcService().tell(
// systemContext.getEncodingService().convertToProtoDataMessage(address.get(), msg.getMsg()));
// } else {
self().tell(msg.getMsg(), ActorRef.noSender());
// }
}
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
if (SYSTEM_TENANT.equals(msg.getTenantId())) {
// this may be a notification about system entities created.
// log.warn("[{}] Invalid service to rule engine msg called. System messages are not supported yet: {}", SYSTEM_TENANT, msg);
msg.getTbMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
} else {
getOrCreateTenantActor(msg.getTenantId()).tell(msg, self());
}
}
@Override
protected void broadcast(Object msg) {
super.broadcast(msg);
tenantActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
}
private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
ActorRef target = null;
if (SYSTEM_TENANT.equals(msg.getTenantId())) {
target = getEntityActorRef(msg.getEntityId());
log.warn("Message has system tenant id: {}", msg);
} else {
if (msg.getEntityId().getEntityType() == EntityType.TENANT
&& msg.getEvent() == ComponentLifecycleEvent.DELETED) {

View File

@ -56,7 +56,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMs
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
@ -213,7 +213,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) {
TransportToDeviceActorMsg msg = wrapper.getMsg();
TbMsgCallback callback = wrapper.getCallback();
TbCallback callback = wrapper.getCallback();
if (msg.hasSessionEvent()) {
processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent());
}

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -110,7 +110,7 @@ class DefaultTbContext implements TbContext {
if (nodeCtx.getSelf().isDebugMode()) {
relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
}
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg), nodeCtx.getSelfActor());
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null), nodeCtx.getSelfActor());
}
@Override
@ -139,53 +139,61 @@ class DefaultTbContext implements TbContext {
mainCtx.getProducerProvider().getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), new SimpleTbQueueCallback(onSuccess, onFailure));
}
@Override
public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) {
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, String relationType) {
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null);
enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null, null);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes) {
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
enqueueForTellNext(tpi, tbMsg, relationTypes, null, null);
enqueueForTellNext(tpi, tbMsg, relationTypes, null, null, null);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), onSuccess, onFailure);
enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
enqueueForTellNext(tpi, tbMsg, relationTypes, onSuccess, onFailure);
enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), onSuccess, onFailure);
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator());
enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, String queueName, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator());
enqueueForTellNext(tpi, tbMsg, relationTypes, onSuccess, onFailure);
enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure);
}
private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg tbMsg, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg tbMsg, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) {
RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId();
RuleNodeId ruleNodeId = nodeCtx.getSelf().getId();
tbMsg = TbMsg.newMsg(tbMsg, ruleChainId, ruleNodeId);
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
TransportProtos.ToRuleEngineMsg.Builder msg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(tbMsg))
.addAllRelationTypes(relationTypes)
.build();
mainCtx.getProducerProvider().getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), new SimpleTbQueueCallback(onSuccess, onFailure));
.addAllRelationTypes(relationTypes);
if (failureMessage != null) {
msg.setFailureMessage(failureMessage);
}
mainCtx.getProducerProvider().getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg.build()), new SimpleTbQueueCallback(onSuccess, onFailure));
}
@Override
@ -207,7 +215,8 @@ class DefaultTbContext implements TbContext {
if (nodeCtx.getSelf().isDebugMode()) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, TbRelationTypes.FAILURE, th);
}
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), Collections.singleton(TbRelationTypes.FAILURE), msg), nodeCtx.getSelfActor());
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), Collections.singleton(TbRelationTypes.FAILURE),
msg, th != null ? th.getMessage() : null), nodeCtx.getSelfActor());
}
public void updateSelf(RuleNode self) {

View File

@ -1,48 +0,0 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.Data;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import java.io.Serializable;
/**
* Created by ashvayka on 19.03.18.
*/
@Data
final class RemoteToRuleChainTellNextMsg extends RuleNodeToRuleChainTellNextMsg implements TenantAwareMsg, RuleChainAwareMsg {
private static final long serialVersionUID = 2459605482321657447L;
private final TenantId tenantId;
private final RuleChainId ruleChainId;
public RemoteToRuleChainTellNextMsg(RuleNodeToRuleChainTellNextMsg original, TenantId tenantId, RuleChainId ruleChainId) {
super(original.getOriginator(), original.getRelationTypes(), original.getMsg());
this.tenantId = tenantId;
this.ruleChainId = ruleChainId;
}
@Override
public MsgType getMsgType() {
return MsgType.REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG;
}
}

View File

@ -22,6 +22,7 @@ import org.thingsboard.server.actors.service.ComponentActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
@ -30,9 +31,9 @@ import scala.concurrent.duration.Duration;
public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMessageProcessor> {
private RuleChainActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId) {
super(systemContext, tenantId, ruleChainId);
setProcessor(new RuleChainActorMessageProcessor(tenantId, ruleChainId, systemContext,
private RuleChainActor(ActorSystemContext systemContext, TenantId tenantId, RuleChain ruleChain) {
super(systemContext, tenantId, ruleChain.getId());
setProcessor(new RuleChainActorMessageProcessor(tenantId, ruleChain, systemContext,
context().parent(), context().self()));
}
@ -46,7 +47,6 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
break;
case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
break;
case RULE_CHAIN_TO_RULE_CHAIN_MSG:
@ -68,17 +68,17 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
private static final long serialVersionUID = 1L;
private final TenantId tenantId;
private final RuleChainId ruleChainId;
private final RuleChain ruleChain;
public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleChainId pluginId) {
public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleChain ruleChain) {
super(context);
this.tenantId = tenantId;
this.ruleChainId = pluginId;
this.ruleChain = ruleChain;
}
@Override
public RuleChainActor create() {
return new RuleChainActor(context, tenantId, ruleChainId);
return new RuleChainActor(context, tenantId, ruleChain);
}
}

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -37,6 +37,8 @@ import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.RuleNodeException;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.rule.RuleChainService;
@ -67,14 +69,16 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private final Map<RuleNodeId, List<RuleNodeRelation>> nodeRoutes;
private final RuleChainService service;
private final TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> producer;
private String ruleChainName;
private RuleNodeId firstId;
private RuleNodeCtx firstNode;
private boolean started;
RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext
RuleChainActorMessageProcessor(TenantId tenantId, RuleChain ruleChain, ActorSystemContext systemContext
, ActorRef parent, ActorRef self) {
super(systemContext, tenantId, ruleChainId);
super(systemContext, tenantId, ruleChain.getId());
this.ruleChainName = ruleChain.getName();
this.parent = parent;
this.self = self;
this.nodeActors = new HashMap<>();
@ -113,6 +117,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
public void onUpdate(ActorContext context) {
RuleChain ruleChain = service.findRuleChainById(tenantId, entityId);
if (ruleChain != null) {
ruleChainName = ruleChain.getName();
List<RuleNode> ruleNodeList = service.getRuleChainNodes(tenantId, entityId);
log.trace("[{}][{}] Updating rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
for (RuleNode ruleNode : ruleNodeList) {
@ -194,7 +199,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) {
TbMsg msg = envelope.getTbMsg();
log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, msg.getId(), msg);
if (envelope.getRelationTypes() == null) {
if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) {
try {
checkActive();
RuleNodeId targetId = msg.getRuleNodeId();
@ -213,10 +218,10 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
msg.getCallback().onSuccess();
}
} catch (Exception e) {
envelope.getTbMsg().getCallback().onFailure(e);
envelope.getTbMsg().getCallback().onFailure(new RuleEngineException(e.getMessage()));
}
} else {
onTellNext(envelope.getTbMsg(), envelope.getTbMsg().getRuleNodeId(), envelope.getRelationTypes());
onTellNext(envelope.getTbMsg(), envelope.getTbMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage());
}
}
@ -230,10 +235,10 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
onTellNext(envelope.getMsg(), envelope.getOriginator(), envelope.getRelationTypes());
onTellNext(envelope.getMsg(), envelope.getOriginator(), envelope.getRelationTypes(), envelope.getFailureMessage());
}
private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes) {
private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) {
try {
checkActive();
EntityId entityId = msg.getOriginator();
@ -245,9 +250,14 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
if (relationsCount == 0) {
log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());
if (relationTypes.contains(TbRelationTypes.FAILURE)) {
log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());
//TODO 2.5: Introduce our own RuleEngineFailureException to track what is wrong
msg.getCallback().onFailure(new RuntimeException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));
RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);
if (ruleNodeCtx != null) {
msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf()));
} else {
log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());
//TODO 2.5: Introduce our own RuleEngineFailureException to track what is wrong
msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));
}
} else {
msg.getCallback().onSuccess();
}
@ -265,7 +275,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
}
} catch (Exception e) {
msg.getCallback().onFailure(e);
msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));
}
}

View File

@ -15,43 +15,89 @@
*/
package org.thingsboard.server.actors.ruleChain;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.Props;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import lombok.Getter;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.shared.rulechain.RuleChainManager;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.dao.rule.RuleChainService;
import java.util.function.Function;
/**
* Created by ashvayka on 15.03.18.
*/
public abstract class RuleChainManagerActor extends ContextAwareActor {
protected final RuleChainManager ruleChainManager;
protected final RuleChainService ruleChainService;
protected final TenantId tenantId;
private final RuleChainService ruleChainService;
private final BiMap<RuleChainId, ActorRef> actors;
@Getter
protected RuleChain rootChain;
@Getter
protected ActorRef rootChainActor;
public RuleChainManagerActor(ActorSystemContext systemContext, RuleChainManager ruleChainManager) {
public RuleChainManagerActor(ActorSystemContext systemContext, TenantId tenantId) {
super(systemContext);
this.ruleChainManager = ruleChainManager;
this.tenantId = tenantId;
this.actors = HashBiMap.create();
this.ruleChainService = systemContext.getRuleChainService();
}
protected void initRuleChains() {
ruleChainManager.init(this.context());
for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChains(tenantId, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
RuleChainId ruleChainId = ruleChain.getId();
log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId());
//TODO: remove this cast making UUIDBased subclass of EntityId an interface and vice versa.
ActorRef actorRef = getOrCreateActor(this.context(), ruleChainId, id -> ruleChain);
visit(ruleChain, actorRef);
log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId());
}
}
protected void visit(RuleChain entity, ActorRef actorRef) {
if (entity != null && entity.isRoot()) {
rootChain = entity;
rootChainActor = actorRef;
}
}
public ActorRef getOrCreateActor(ActorContext context, RuleChainId ruleChainId) {
return getOrCreateActor(context, ruleChainId, eId -> ruleChainService.findRuleChainById(TenantId.SYS_TENANT_ID, eId));
}
public ActorRef getOrCreateActor(ActorContext context, RuleChainId ruleChainId, Function<RuleChainId, RuleChain> provider) {
return actors.computeIfAbsent(ruleChainId, eId -> {
RuleChain ruleChain = provider.apply(eId);
return context.actorOf(Props.create(new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain))
.withDispatcher(DefaultActorService.TENANT_RULE_DISPATCHER_NAME), eId.toString());
});
}
protected ActorRef getEntityActorRef(EntityId entityId) {
ActorRef target = null;
switch (entityId.getEntityType()) {
case RULE_CHAIN:
target = ruleChainManager.getOrCreateActor(this.context(), (RuleChainId) entityId);
break;
if (entityId.getEntityType() == EntityType.RULE_CHAIN) {
target = getOrCreateActor(this.context(), (RuleChainId) entityId);
}
return target;
}
protected void broadcast(Object msg) {
ruleChainManager.broadcast(msg);
actors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
}
public ActorRef get(RuleChainId id) {
return actors.get(id);
}
}

View File

@ -34,6 +34,7 @@ class RuleNodeToRuleChainTellNextMsg implements TbActorMsg, Serializable {
private final RuleNodeId originator;
private final Set<String> relationTypes;
private final TbMsg msg;
private final String failureMessage;
@Override
public MsgType getMsgType() {

View File

@ -1,93 +0,0 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.shared;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.ruleChain.RuleChainActor;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.common.data.SearchTextBased;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.page.PageDataIterable;
import java.util.HashMap;
import java.util.Map;
/**
* Created by ashvayka on 15.03.18.
*/
@Slf4j
public abstract class EntityActorsManager<T extends EntityId, A extends UntypedActor, M extends SearchTextBased<? extends UUIDBased>> {
protected final ActorSystemContext systemContext;
protected final BiMap<T, ActorRef> actors;
public EntityActorsManager(ActorSystemContext systemContext) {
this.systemContext = systemContext;
this.actors = HashBiMap.create();
}
protected abstract TenantId getTenantId();
protected abstract String getDispatcherName();
protected abstract Creator<A> creator(T entityId);
protected abstract PageDataIterable.FetchFunction<M> getFetchEntitiesFunction();
public void init(ActorContext context) {
for (M entity : new PageDataIterable<>(getFetchEntitiesFunction(), ContextAwareActor.ENTITY_PACK_LIMIT)) {
T entityId = (T) entity.getId();
log.debug("[{}|{}] Creating entity actor", entityId.getEntityType(), entityId.getId());
//TODO: remove this cast making UUIDBased subclass of EntityId an interface and vice versa.
ActorRef actorRef = getOrCreateActor(context, entityId);
visit(entity, actorRef);
log.debug("[{}|{}] Entity actor created.", entityId.getEntityType(), entityId.getId());
}
}
public void visit(M entity, ActorRef actorRef) {
}
public ActorRef getOrCreateActor(ActorContext context, T entityId) {
return actors.computeIfAbsent(entityId, eId ->
context.actorOf(Props.create(creator(eId))
.withDispatcher(getDispatcherName()), eId.toString()));
}
public void broadcast(Object msg) {
actors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
}
public void remove(T id) {
actors.remove(id);
}
public ActorRef get(T id) {
return actors.get(id);
}
}

View File

@ -1,59 +0,0 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.shared.rulechain;
import akka.actor.ActorRef;
import akka.japi.Creator;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.ruleChain.RuleChainActor;
import org.thingsboard.server.actors.shared.EntityActorsManager;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.dao.rule.RuleChainService;
/**
* Created by ashvayka on 15.03.18.
*/
@Slf4j
public abstract class RuleChainManager extends EntityActorsManager<RuleChainId, RuleChainActor, RuleChain> {
protected final RuleChainService service;
@Getter
protected RuleChain rootChain;
@Getter
protected ActorRef rootChainActor;
public RuleChainManager(ActorSystemContext systemContext) {
super(systemContext);
this.service = systemContext.getRuleChainService();
}
@Override
public Creator<RuleChainActor> creator(RuleChainId entityId) {
return new RuleChainActor.ActorCreator(systemContext, getTenantId(), entityId);
}
@Override
public void visit(RuleChain entity, ActorRef actorRef) {
if (entity != null && entity.isRoot()) {
rootChain = entity;
rootChainActor = actorRef;
}
}
}

View File

@ -1,48 +0,0 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.shared.rulechain;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.dao.model.ModelConstants;
import java.util.Collections;
public class SystemRuleChainManager extends RuleChainManager {
public SystemRuleChainManager(ActorSystemContext systemContext) {
super(systemContext);
}
@Override
protected FetchFunction<RuleChain> getFetchEntitiesFunction() {
return link -> new TextPageData<>(Collections.emptyList(), link);
}
@Override
protected TenantId getTenantId() {
return ModelConstants.SYSTEM_TENANT;
}
@Override
protected String getDispatcherName() {
return DefaultActorService.SYSTEM_RULE_DISPATCHER_NAME;
}
}

View File

@ -1,53 +0,0 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.shared.rulechain;
import akka.actor.ActorContext;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
import org.thingsboard.server.common.data.rule.RuleChain;
public class TenantRuleChainManager extends RuleChainManager {
private final TenantId tenantId;
public TenantRuleChainManager(ActorSystemContext systemContext, TenantId tenantId) {
super(systemContext);
this.tenantId = tenantId;
}
@Override
public void init(ActorContext context) {
super.init(context);
}
@Override
protected TenantId getTenantId() {
return tenantId;
}
@Override
protected String getDispatcherName() {
return DefaultActorService.TENANT_RULE_DISPATCHER_NAME;
}
@Override
protected FetchFunction<RuleChain> getFetchEntitiesFunction() {
return link -> service.findTenantRuleChains(tenantId, link);
}
}

View File

@ -24,7 +24,6 @@ import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Event;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
@Slf4j
public class StatsActor extends ContextAwareActor {

View File

@ -30,7 +30,6 @@ import org.thingsboard.server.actors.device.DeviceActorCreator;
import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.rulechain.TenantRuleChainManager;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.RuleChainId;
@ -43,6 +42,7 @@ import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.ServiceType;
import scala.concurrent.duration.Duration;
@ -51,12 +51,12 @@ import java.util.stream.Collectors;
public class TenantActor extends RuleChainManagerActor {
private final TenantId tenantId;
private final BiMap<DeviceId, ActorRef> deviceActors;
private boolean isRuleEngine;
private boolean isCore;
private TenantActor(ActorSystemContext systemContext, TenantId tenantId) {
super(systemContext, new TenantRuleChainManager(systemContext, tenantId));
this.tenantId = tenantId;
super(systemContext, tenantId);
this.deviceActors = HashBiMap.create();
}
@ -69,7 +69,11 @@ public class TenantActor extends RuleChainManagerActor {
public void preStart() {
log.info("[{}] Starting tenant actor.", tenantId);
try {
initRuleChains();
isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
if (isRuleEngine) {
initRuleChains();
}
log.info("[{}] Tenant actor started.", tenantId);
} catch (Exception e) {
log.warn("[{}] Unknown failure", tenantId, e);
@ -115,7 +119,6 @@ public class TenantActor extends RuleChainManagerActor {
onToDeviceActorMsg((DeviceAwareMsg) msg);
break;
case RULE_CHAIN_TO_RULE_CHAIN_MSG:
case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
onRuleChainMsg((RuleChainAwareMsg) msg);
break;
default:
@ -129,16 +132,19 @@ public class TenantActor extends RuleChainManagerActor {
}
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
if (!isRuleEngine) {
log.warn("RECEIVED INVALID MESSAGE: {}", msg);
}
TbMsg tbMsg = msg.getTbMsg();
if (tbMsg.getRuleChainId() == null) {
if (ruleChainManager.getRootChainActor() != null) {
ruleChainManager.getRootChainActor().tell(msg, self());
if (getRootChainActor() != null) {
getRootChainActor().tell(msg, self());
} else {
tbMsg.getCallback().onFailure(new RuntimeException("No Root Rule Chain available!"));
tbMsg.getCallback().onFailure(new RuleEngineException("No Root Rule Chain available!"));
log.info("[{}] No Root Chain: {}", tenantId, msg);
}
} else {
ActorRef ruleChainActor = ruleChainManager.get(tbMsg.getRuleChainId());
ActorRef ruleChainActor = get(tbMsg.getRuleChainId());
if (ruleChainActor != null) {
ruleChainActor.tell(msg, self());
} else {
@ -150,24 +156,29 @@ public class TenantActor extends RuleChainManagerActor {
}
private void onRuleChainMsg(RuleChainAwareMsg msg) {
ruleChainManager.getOrCreateActor(context(), msg.getRuleChainId()).tell(msg, self());
getOrCreateActor(context(), msg.getRuleChainId()).tell(msg, self());
}
private void onToDeviceActorMsg(DeviceAwareMsg msg) {
if (!isCore) {
log.warn("RECEIVED INVALID MESSAGE: {}", msg);
}
getOrCreateDeviceActor(msg.getDeviceId()).tell(msg, ActorRef.noSender());
}
private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
ActorRef target = getEntityActorRef(msg.getEntityId());
if (target != null) {
if (msg.getEntityId().getEntityType() == EntityType.RULE_CHAIN) {
RuleChain ruleChain = systemContext.getRuleChainService().
findRuleChainById(tenantId, new RuleChainId(msg.getEntityId().getId()));
ruleChainManager.visit(ruleChain, target);
if (isRuleEngine) {
ActorRef target = getEntityActorRef(msg.getEntityId());
if (target != null) {
if (msg.getEntityId().getEntityType() == EntityType.RULE_CHAIN) {
RuleChain ruleChain = systemContext.getRuleChainService().
findRuleChainById(tenantId, new RuleChainId(msg.getEntityId().getId()));
visit(ruleChain, target);
}
target.tell(msg, ActorRef.noSender());
} else {
log.debug("[{}] Invalid component lifecycle msg: {}", tenantId, msg);
}
target.tell(msg, ActorRef.noSender());
} else {
log.debug("[{}] Invalid component lifecycle msg: {}", tenantId, msg);
}
}
@ -214,15 +225,12 @@ public class TenantActor extends RuleChainManagerActor {
}
}
private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), new Function<Throwable, SupervisorStrategy.Directive>() {
@Override
public SupervisorStrategy.Directive apply(Throwable t) {
log.warn("[{}] Unknown failure", tenantId, t);
if (t instanceof ActorInitializationException) {
return SupervisorStrategy.stop();
} else {
return SupervisorStrategy.resume();
}
private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), t -> {
log.warn("[{}] Unknown failure", tenantId, t);
if (t instanceof ActorInitializationException) {
return SupervisorStrategy.stop();
} else {
return SupervisorStrategy.resume();
}
});

View File

@ -92,10 +92,10 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
try {
SessionMetaData sessionMd = internalSessionMap.get(session.getId());
if (sessionMd != null) {
log.info("[{}][{}] Processing {}", sessionMd.sessionRef.getSecurityCtx().getTenantId(), session.getId(), message.getPayload());
log.trace("[{}][{}] Processing {}", sessionMd.sessionRef.getSecurityCtx().getTenantId(), session.getId(), message.getPayload());
webSocketService.handleWebSocketMsg(sessionMd.sessionRef, message.getPayload());
} else {
log.warn("[{}] Failed to find session", session.getId());
log.trace("[{}] Failed to find session", session.getId());
session.close(CloseStatus.SERVER_ERROR.withReason("Session not found!"));
}
} catch (IOException e) {
@ -139,7 +139,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
if (sessionMd != null) {
processInWebSocketService(sessionMd.sessionRef, SessionEvent.onError(tError));
} else {
log.warn("[{}] Failed to find session", session.getId());
log.trace("[{}] Failed to find session", session.getId());
}
log.trace("[{}] Session transport error", session.getId(), tError);
}

View File

@ -25,7 +25,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto;
import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto;
@ -120,8 +120,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
ConcurrentMap<UUID, TbProtoQueueMsg<ToCoreMsg>> failedMap = new ConcurrentHashMap<>();
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
pendingMap.forEach((id, msg) -> {
log.info("[{}] Creating main callback for message: {}", id, msg.getValue());
TbMsgCallback callback = new MsgPackCallback<>(id, processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>(), failedMap);
log.trace("[{}] Creating main callback for message: {}", id, msg.getValue());
TbCallback callback = new TbPackCallback<>(id, processingTimeoutLatch, pendingMap, failedMap);
try {
ToCoreMsg toCoreMsg = msg.getValue();
if (toCoreMsg.hasToSubscriptionMgrMsg()) {
@ -182,7 +182,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
}
@Override
protected void handleNotification(UUID id, TbProtoQueueMsg<ToCoreNotificationMsg> msg, TbMsgCallback callback) {
protected void handleNotification(UUID id, TbProtoQueueMsg<ToCoreNotificationMsg> msg, TbCallback callback) {
ToCoreNotificationMsg toCoreMsg = msg.getValue();
if (toCoreMsg.hasToLocalSubscriptionServiceMsg()) {
log.trace("[{}] Forwarding message to local subscription service {}", id, toCoreMsg.getToLocalSubscriptionServiceMsg());
@ -200,7 +200,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
}
}
private void forwardToCoreRpcService(FromDeviceRPCResponseProto proto, TbMsgCallback callback) {
private void forwardToCoreRpcService(FromDeviceRPCResponseProto proto, TbCallback callback) {
RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null;
FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB())
, proto.getResponse(), error);
@ -215,7 +215,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
}
}
private void forwardToLocalSubMgrService(LocalSubscriptionServiceMsgProto msg, TbMsgCallback callback) {
private void forwardToLocalSubMgrService(LocalSubscriptionServiceMsgProto msg, TbCallback callback) {
if (msg.hasSubUpdate()) {
localSubscriptionService.onSubscriptionUpdate(msg.getSubUpdate().getSessionId(), TbSubscriptionUtils.fromProto(msg.getSubUpdate()), callback);
} else {
@ -223,7 +223,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
}
}
private void forwardToSubMgrService(SubscriptionMgrMsgProto msg, TbMsgCallback callback) {
private void forwardToSubMgrService(SubscriptionMgrMsgProto msg, TbCallback callback) {
if (msg.hasAttributeSub()) {
subscriptionManagerService.addSubscription(TbSubscriptionUtils.fromProto(msg.getAttributeSub()), callback);
} else if (msg.hasTelemetrySub()) {
@ -248,21 +248,21 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
}
}
private void forwardToStateService(DeviceStateServiceMsgProto deviceStateServiceMsg, TbMsgCallback callback) {
private void forwardToStateService(DeviceStateServiceMsgProto deviceStateServiceMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceStateServiceMsg);
}
stateService.onQueueMsg(deviceStateServiceMsg, callback);
}
private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TbMsgCallback callback) {
private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(toDeviceActorMsg);
}
actorContext.getAppActor().tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback), ActorRef.noSender());
}
private void throwNotHandled(Object msg, TbMsgCallback callback) {
private void throwNotHandled(Object msg, TbCallback callback) {
log.warn("Message not handled: {}", msg);
callback.onFailure(new RuntimeException("Message not handled!"));
}

View File

@ -27,8 +27,11 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.RuleNodeException;
import org.thingsboard.server.common.msg.queue.ServiceQueue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
@ -45,6 +48,7 @@ import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingDec
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategy;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory;
import org.thingsboard.server.service.stats.RuleEngineStatisticsService;
import javax.annotation.PostConstruct;
import java.util.Collections;
@ -69,20 +73,22 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
private long pollDuration;
@Value("${queue.rule-engine.pack-processing-timeout}")
private long packProcessingTimeout;
@Value("${queue.rule-engine.stats.enabled:false}")
@Value("${queue.rule-engine.stats.enabled:true}")
private boolean statsEnabled;
private final TbCoreConsumerStats stats = new TbCoreConsumerStats();
private final TbRuleEngineProcessingStrategyFactory factory;
private final TbRuleEngineQueueFactory tbRuleEngineQueueFactory;
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final RuleEngineStatisticsService statisticsService;
private final ConcurrentMap<String, TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> consumers = new ConcurrentHashMap<>();
private final ConcurrentMap<String, TbRuleEngineQueueConfiguration> consumerConfigurations = new ConcurrentHashMap<>();
private final ConcurrentMap<String, TbRuleEngineConsumerStats> consumerStats = new ConcurrentHashMap<>();
public DefaultTbRuleEngineConsumerService(TbRuleEngineProcessingStrategyFactory factory, TbQueueRuleEngineSettings ruleEngineSettings,
TbRuleEngineQueueFactory tbRuleEngineQueueFactory,
TbRuleEngineQueueFactory tbRuleEngineQueueFactory, RuleEngineStatisticsService statisticsService,
ActorSystemContext actorContext, DataDecodingEncodingService encodingService) {
super(actorContext, encodingService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer());
this.statisticsService = statisticsService;
this.ruleEngineSettings = ruleEngineSettings;
this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory;
this.factory = factory;
@ -92,7 +98,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
public void init() {
super.init("tb-rule-engine-consumer", "tb-rule-engine-notifications-consumer");
for (TbRuleEngineQueueConfiguration configuration : ruleEngineSettings.getQueues()) {
consumerConfigurations.putIfAbsent(configuration.getName(), configuration);
consumers.computeIfAbsent(configuration.getName(), queueName -> tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration));
consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName()));
}
}
@ -107,7 +115,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
@Override
protected void launchMainConsumers() {
consumers.forEach((queue, consumer) -> launchConsumer(consumer, consumerConfigurations.get(queue)));
consumers.forEach((queue, consumer) -> launchConsumer(consumer, consumerConfigurations.get(queue), consumerStats.get(queue)));
}
@Override
@ -115,7 +123,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
consumers.values().forEach(TbQueueConsumer::unsubscribe);
}
private void launchConsumer(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, TbRuleEngineQueueConfiguration configuration) {
private void launchConsumer(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats) {
consumersExecutor.execute(() -> {
while (!stopped) {
try {
@ -123,7 +131,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
if (msgs.isEmpty()) {
continue;
}
TbRuleEngineProcessingStrategy strategy = factory.newInstance(configuration.getAckStrategy());
TbRuleEngineProcessingStrategy strategy = factory.newInstance(configuration.getName(), configuration.getAckStrategy());
TbRuleEngineProcessingDecision decision = null;
boolean firstAttempt = true;
while (!stopped && (firstAttempt || !decision.isCommit())) {
@ -137,21 +145,21 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
}
ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> successMap = new ConcurrentHashMap<>();
ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> failedMap = new ConcurrentHashMap<>();
ConcurrentMap<TenantId, RuleEngineException> exceptionsMap = new ConcurrentHashMap<>();
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
allMap.forEach((id, msg) -> {
log.info("[{}] Creating main callback for message: {}", id, msg.getValue());
TbMsgCallback callback = new MsgPackCallback<>(id, processingTimeoutLatch, allMap, successMap, failedMap);
log.trace("[{}] Creating main callback for message: {}", id, msg.getValue());
ToRuleEngineMsg toRuleEngineMsg = msg.getValue();
TenantId tenantId = new TenantId(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB()));
TbMsgCallback callback = new TbMsgPackCallback<>(id, tenantId, processingTimeoutLatch, allMap, successMap, failedMap, exceptionsMap);
try {
ToRuleEngineMsg toRuleEngineMsg = msg.getValue();
TenantId tenantId = new TenantId(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB()));
if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) {
forwardToRuleEngineActor(tenantId, toRuleEngineMsg, callback);
} else {
callback.onSuccess();
}
} catch (Throwable e) {
callback.onFailure(e);
} catch (Exception e) {
callback.onFailure(new RuleEngineException(e.getMessage()));
}
});
@ -159,7 +167,11 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
timeout = true;
}
decision = strategy.analyze(new TbRuleEngineProcessingResult(timeout, allMap, successMap, failedMap));
TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(timeout, allMap, successMap, failedMap, exceptionsMap);
decision = strategy.analyze(result);
if (statsEnabled) {
stats.log(result, decision.isCommit());
}
}
consumer.commit();
} catch (Exception e) {
@ -193,7 +205,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
}
@Override
protected void handleNotification(UUID id, TbProtoQueueMsg<ToRuleEngineNotificationMsg> msg, TbMsgCallback callback) throws Exception {
protected void handleNotification(UUID id, TbProtoQueueMsg<ToRuleEngineNotificationMsg> msg, TbCallback callback) throws Exception {
ToRuleEngineNotificationMsg nfMsg = msg.getValue();
if (nfMsg.getComponentLifecycleMsg() != null && !nfMsg.getComponentLifecycleMsg().isEmpty()) {
Optional<TbActorMsg> actorMsg = encodingService.decode(nfMsg.getComponentLifecycleMsg().toByteArray());
@ -219,18 +231,18 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
relationTypes = new HashSet<>(relationTypesList);
}
}
msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes);
msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes, toRuleEngineMsg.getFailureMessage());
actorContext.getAppActor().tell(msg, ActorRef.noSender());
//TODO: 2.5 before release.
// if (statsEnabled) {
// stats.log(toDeviceActorMsg);
// }
}
@Scheduled(fixedDelayString = "${queue.rule-engine.stats.print-interval-ms}")
public void printStats() {
if (statsEnabled) {
stats.printStats();
long ts = System.currentTimeMillis();
consumerStats.forEach((queue, stats) -> {
stats.printStats();
statisticsService.reportQueueStats(ts, stats);
});
}
}

View File

@ -0,0 +1,78 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.queue;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.RuleNodeException;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@Slf4j
public class TbMsgPackCallback<T> implements TbMsgCallback {
private final CountDownLatch processingTimeoutLatch;
private final ConcurrentMap<UUID, T> ackMap;
private final ConcurrentMap<UUID, T> successMap;
private final ConcurrentMap<UUID, T> failedMap;
private final UUID id;
private final TenantId tenantId;
private final ConcurrentMap<TenantId, RuleEngineException> firstExceptions;
public TbMsgPackCallback(UUID id, TenantId tenantId,
CountDownLatch processingTimeoutLatch,
ConcurrentMap<UUID, T> ackMap,
ConcurrentMap<UUID, T> successMap,
ConcurrentMap<UUID, T> failedMap,
ConcurrentMap<TenantId, RuleEngineException> firstExceptions) {
this.id = id;
this.tenantId = tenantId;
this.processingTimeoutLatch = processingTimeoutLatch;
this.ackMap = ackMap;
this.successMap = successMap;
this.failedMap = failedMap;
this.firstExceptions = firstExceptions;
}
@Override
public void onSuccess() {
log.trace("[{}] ON SUCCESS", id);
T msg = ackMap.remove(id);
if (msg != null) {
successMap.put(id, msg);
}
if (msg != null && ackMap.isEmpty()) {
processingTimeoutLatch.countDown();
}
}
@Override
public void onFailure(RuleEngineException e) {
log.trace("[{}] ON FAILURE", id, e);
T msg = ackMap.remove(id);
if (msg != null) {
failedMap.put(id, msg);
firstExceptions.putIfAbsent(tenantId, e);
}
if (ackMap.isEmpty()) {
processingTimeoutLatch.countDown();
}
}
}

View File

@ -16,30 +16,26 @@
package org.thingsboard.server.service.queue;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@Slf4j
public class MsgPackCallback<T> implements TbMsgCallback {
public class TbPackCallback<T> implements TbCallback {
private final CountDownLatch processingTimeoutLatch;
private final ConcurrentMap<UUID, T> ackMap;
private final ConcurrentMap<UUID, T> successMap;
private final ConcurrentMap<UUID, T> failedMap;
private final UUID id;
public MsgPackCallback(UUID id, CountDownLatch processingTimeoutLatch,
ConcurrentMap<UUID, T> ackMap,
ConcurrentMap<UUID, T> successMap,
ConcurrentMap<UUID, T> failedMap) {
public TbPackCallback(UUID id,
CountDownLatch processingTimeoutLatch,
ConcurrentMap<UUID, T> ackMap,
ConcurrentMap<UUID, T> failedMap) {
this.id = id;
this.processingTimeoutLatch = processingTimeoutLatch;
this.ackMap = ackMap;
this.successMap = successMap;
this.failedMap = failedMap;
}
@ -47,9 +43,6 @@ public class MsgPackCallback<T> implements TbMsgCallback {
public void onSuccess() {
log.trace("[{}] ON SUCCESS", id);
T msg = ackMap.remove(id);
if (msg != null) {
successMap.put(id, msg);
}
if (msg != null && ackMap.isEmpty()) {
processingTimeoutLatch.countDown();
}
@ -57,7 +50,7 @@ public class MsgPackCallback<T> implements TbMsgCallback {
@Override
public void onFailure(Throwable t) {
log.trace("[{}] ON FAILURE", id);
log.trace("[{}] ON FAILURE", id, t);
T msg = ackMap.remove(id);
if (msg != null) {
failedMap.put(id, msg);

View File

@ -15,38 +15,119 @@
*/
package org.thingsboard.server.service.queue;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.RuleNodeException;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Data
public class TbRuleEngineConsumerStats {
private final AtomicInteger totalCounter = new AtomicInteger(0);
private final AtomicInteger postTelemetryCounter = new AtomicInteger(0);
private final AtomicInteger postAttributesCounter = new AtomicInteger(0);
private final AtomicInteger toServerRPCCallRequestCounter = new AtomicInteger(0);
public static final String TOTAL_MSGS = "totalMsgs";
public static final String SUCCESSFUL_MSGS = "successfulMsgs";
public static final String TMP_TIMEOUT = "tmpTimeout";
public static final String TMP_FAILED = "tmpFailed";
public static final String TIMEOUT_MSGS = "timeoutMsgs";
public static final String FAILED_MSGS = "failedMsgs";
public static final String SUCCESSFUL_ITERATIONS = "successfulIterations";
public static final String FAILED_ITERATIONS = "failedIterations";
public void log(TransportProtos.TransportToRuleEngineMsg msg) {
totalCounter.incrementAndGet();
if (msg.hasPostTelemetry()) {
postTelemetryCounter.incrementAndGet();
}
if (msg.hasPostAttributes()) {
postAttributesCounter.incrementAndGet();
}
if (msg.hasToServerRPCCallRequest()) {
toServerRPCCallRequestCounter.incrementAndGet();
private final AtomicInteger totalMsgCounter = new AtomicInteger(0);
private final AtomicInteger successMsgCounter = new AtomicInteger(0);
private final AtomicInteger tmpTimeoutMsgCounter = new AtomicInteger(0);
private final AtomicInteger tmpFailedMsgCounter = new AtomicInteger(0);
private final AtomicInteger timeoutMsgCounter = new AtomicInteger(0);
private final AtomicInteger failedMsgCounter = new AtomicInteger(0);
private final AtomicInteger successIterationsCounter = new AtomicInteger(0);
private final AtomicInteger failedIterationsCounter = new AtomicInteger(0);
private final Map<String, AtomicInteger> counters = new HashMap<>();
private final ConcurrentMap<UUID, TbTenantRuleEngineStats> tenantStats = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantId, RuleEngineException> tenantExceptions = new ConcurrentHashMap<>();
private final String queueName;
public TbRuleEngineConsumerStats(String queueName) {
this.queueName = queueName;
counters.put(TOTAL_MSGS, totalMsgCounter);
counters.put(SUCCESSFUL_MSGS, successMsgCounter);
counters.put(TIMEOUT_MSGS, timeoutMsgCounter);
counters.put(FAILED_MSGS, failedMsgCounter);
counters.put(TMP_TIMEOUT, tmpTimeoutMsgCounter);
counters.put(TMP_FAILED, tmpFailedMsgCounter);
counters.put(SUCCESSFUL_ITERATIONS, successIterationsCounter);
counters.put(FAILED_ITERATIONS, failedIterationsCounter);
}
public void log(TbRuleEngineProcessingResult msg, boolean finalIterationForPack) {
int success = msg.getSuccessMap().size();
int pending = msg.getPendingMap().size();
int failed = msg.getFailureMap().size();
totalMsgCounter.addAndGet(success + pending + failed);
successMsgCounter.addAndGet(success);
msg.getSuccessMap().values().forEach(m -> getTenantStats(m).logSuccess());
if (finalIterationForPack) {
if (pending > 0 || failed > 0) {
timeoutMsgCounter.addAndGet(pending);
failedMsgCounter.addAndGet(failed);
if (pending > 0) {
msg.getPendingMap().values().forEach(m -> getTenantStats(m).logTimeout());
}
if (failed > 0) {
msg.getFailureMap().values().forEach(m -> getTenantStats(m).logFailed());
}
failedIterationsCounter.incrementAndGet();
} else {
successIterationsCounter.incrementAndGet();
}
} else {
failedIterationsCounter.incrementAndGet();
tmpTimeoutMsgCounter.addAndGet(pending);
tmpFailedMsgCounter.addAndGet(failed);
if (pending > 0) {
msg.getPendingMap().values().forEach(m -> getTenantStats(m).logTmpTimeout());
}
if (failed > 0) {
msg.getFailureMap().values().forEach(m -> getTenantStats(m).logTmpFailed());
}
}
msg.getExceptionsMap().forEach(tenantExceptions::putIfAbsent);
}
private TbTenantRuleEngineStats getTenantStats(TbProtoQueueMsg<ToRuleEngineMsg> m) {
ToRuleEngineMsg reMsg = m.getValue();
return tenantStats.computeIfAbsent(new UUID(reMsg.getTenantIdMSB(), reMsg.getTenantIdLSB()), TbTenantRuleEngineStats::new);
}
public void printStats() {
int total = totalCounter.getAndSet(0);
int total = totalMsgCounter.get();
if (total > 0) {
log.info("Transport total [{}] telemetry [{}] attributes [{}] toServerRpc [{}]",
total, postTelemetryCounter.getAndSet(0),
postAttributesCounter.getAndSet(0), toServerRPCCallRequestCounter.getAndSet(0));
StringBuilder stats = new StringBuilder();
counters.forEach((label, value) -> {
stats.append(label).append(" = [").append(value.get()).append("] ");
});
log.info("[{}] Stats: {}", queueName, stats);
}
}
public void reset() {
counters.values().forEach(counter -> counter.set(0));
tenantStats.clear();
tenantExceptions.clear();
}
}

View File

@ -0,0 +1,92 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.queue;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Data
public class TbTenantRuleEngineStats {
private final UUID tenantId;
private final AtomicInteger totalMsgCounter = new AtomicInteger(0);
private final AtomicInteger successMsgCounter = new AtomicInteger(0);
private final AtomicInteger tmpTimeoutMsgCounter = new AtomicInteger(0);
private final AtomicInteger tmpFailedMsgCounter = new AtomicInteger(0);
private final AtomicInteger timeoutMsgCounter = new AtomicInteger(0);
private final AtomicInteger failedMsgCounter = new AtomicInteger(0);
private final Map<String, AtomicInteger> counters = new HashMap<>();
public TbTenantRuleEngineStats(UUID tenantId) {
this.tenantId = tenantId;
counters.put(TbRuleEngineConsumerStats.TOTAL_MSGS, totalMsgCounter);
counters.put(TbRuleEngineConsumerStats.SUCCESSFUL_MSGS, successMsgCounter);
counters.put(TbRuleEngineConsumerStats.TIMEOUT_MSGS, timeoutMsgCounter);
counters.put(TbRuleEngineConsumerStats.FAILED_MSGS, failedMsgCounter);
counters.put(TbRuleEngineConsumerStats.TMP_TIMEOUT, tmpTimeoutMsgCounter);
counters.put(TbRuleEngineConsumerStats.TMP_FAILED, tmpFailedMsgCounter);
}
public void logSuccess() {
totalMsgCounter.incrementAndGet();
successMsgCounter.incrementAndGet();
}
public void logFailed() {
totalMsgCounter.incrementAndGet();
failedMsgCounter.incrementAndGet();
}
public void logTimeout() {
totalMsgCounter.incrementAndGet();
timeoutMsgCounter.incrementAndGet();
}
public void logTmpFailed() {
totalMsgCounter.incrementAndGet();
tmpFailedMsgCounter.incrementAndGet();
}
public void logTmpTimeout() {
totalMsgCounter.incrementAndGet();
tmpTimeoutMsgCounter.incrementAndGet();
}
public void printStats() {
int total = totalMsgCounter.get();
if (total > 0) {
StringBuilder stats = new StringBuilder();
counters.forEach((label, value) -> {
stats.append(label).append(" = [").append(value.get()).append("]");
});
log.info("[{}] Stats: {}", tenantId, stats);
}
}
public void reset() {
counters.values().forEach(counter -> counter.set(0));
}
}

View File

@ -22,12 +22,12 @@ import org.springframework.context.event.EventListener;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
import org.thingsboard.server.service.queue.MsgPackCallback;
import org.thingsboard.server.service.queue.TbPackCallback;
import javax.annotation.PreDestroy;
import java.util.List;
@ -95,8 +95,8 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
ConcurrentMap<UUID, TbProtoQueueMsg<N>> failedMap = new ConcurrentHashMap<>();
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
pendingMap.forEach((id, msg) -> {
log.info("[{}] Creating notification callback for message: {}", id, msg.getValue());
TbMsgCallback callback = new MsgPackCallback<>(id, processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>(), failedMap);
log.trace("[{}] Creating notification callback for message: {}", id, msg.getValue());
TbCallback callback = new TbPackCallback<>(id, processingTimeoutLatch, pendingMap, failedMap);
try {
handleNotification(id, msg, callback);
} catch (Throwable e) {
@ -124,7 +124,7 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
});
}
protected abstract void handleNotification(UUID id, TbProtoQueueMsg<N> msg, TbMsgCallback callback) throws Exception;
protected abstract void handleNotification(UUID id, TbProtoQueueMsg<N> msg, TbCallback callback) throws Exception;
@PreDestroy
public void destroy() {

View File

@ -16,6 +16,8 @@
package org.thingsboard.server.service.queue.processing;
import lombok.Getter;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -25,24 +27,28 @@ import java.util.concurrent.ConcurrentMap;
public class TbRuleEngineProcessingResult {
@Getter
private boolean success;
private final boolean success;
@Getter
private boolean timeout;
private final boolean timeout;
@Getter
private ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> pendingMap;
private final ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> pendingMap;
@Getter
private ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> successMap;
private final ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> successMap;
@Getter
private ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> failureMap;
private final ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> failureMap;
@Getter
private final ConcurrentMap<TenantId, RuleEngineException> exceptionsMap;
public TbRuleEngineProcessingResult(boolean timeout,
ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> pendingMap,
ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> successMap,
ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> failureMap) {
ConcurrentMap<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> failureMap,
ConcurrentMap<TenantId, RuleEngineException> exceptionsMap) {
this.timeout = timeout;
this.pendingMap = pendingMap;
this.successMap = successMap;
this.failureMap = failureMap;
this.exceptionsMap = exceptionsMap;
this.success = !timeout && pendingMap.isEmpty() && failureMap.isEmpty();
}
}

View File

@ -32,24 +32,25 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public class TbRuleEngineProcessingStrategyFactory {
public TbRuleEngineProcessingStrategy newInstance(TbRuleEngineQueueAckStrategyConfiguration configuration) {
public TbRuleEngineProcessingStrategy newInstance(String name, TbRuleEngineQueueAckStrategyConfiguration configuration) {
switch (configuration.getType()) {
case "SKIP_ALL":
return new SkipStrategy();
return new SkipStrategy(name);
case "RETRY_ALL":
return new RetryStrategy(true, true, true, configuration);
return new RetryStrategy(name, true, true, true, configuration);
case "RETRY_FAILED":
return new RetryStrategy(false, true, false, configuration);
return new RetryStrategy(name, false, true, false, configuration);
case "RETRY_TIMED_OUT":
return new RetryStrategy(false, false, true, configuration);
return new RetryStrategy(name, false, false, true, configuration);
case "RETRY_FAILED_AND_TIMED_OUT":
return new RetryStrategy(false, true, true, configuration);
return new RetryStrategy(name, false, true, true, configuration);
default:
throw new RuntimeException("TbRuleEngineProcessingStrategy with type " + configuration.getType() + " is not supported!");
}
}
private static class RetryStrategy implements TbRuleEngineProcessingStrategy {
private final String queueName;
private final boolean retrySuccessful;
private final boolean retryFailed;
private final boolean retryTimeout;
@ -60,7 +61,8 @@ public class TbRuleEngineProcessingStrategyFactory {
private int initialTotalCount;
private int retryCount;
public RetryStrategy(boolean retrySuccessful, boolean retryFailed, boolean retryTimeout, TbRuleEngineQueueAckStrategyConfiguration configuration) {
public RetryStrategy(String queueName, boolean retrySuccessful, boolean retryFailed, boolean retryTimeout, TbRuleEngineQueueAckStrategyConfiguration configuration) {
this.queueName = queueName;
this.retrySuccessful = retrySuccessful;
this.retryFailed = retryFailed;
this.retryTimeout = retryTimeout;
@ -80,10 +82,10 @@ public class TbRuleEngineProcessingStrategyFactory {
retryCount++;
double failedCount = result.getFailureMap().size() + result.getPendingMap().size();
if (maxRetries > 0 && retryCount > maxRetries) {
log.info("Skip reprocess of the rule engine pack due to max retries");
log.info("[{}] Skip reprocess of the rule engine pack due to max retries", queueName);
return new TbRuleEngineProcessingDecision(true, null);
} else if (maxAllowedFailurePercentage > 0 && (failedCount / initialTotalCount) > maxAllowedFailurePercentage) {
log.info("Skip reprocess of the rule engine pack due to max allowed failure percentage");
log.info("[{}] Skip reprocess of the rule engine pack due to max allowed failure percentage", queueName);
return new TbRuleEngineProcessingDecision(true, null);
} else {
ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> toReprocess = new ConcurrentHashMap<>(initialTotalCount);
@ -96,8 +98,7 @@ public class TbRuleEngineProcessingStrategyFactory {
if (retrySuccessful) {
result.getSuccessMap().forEach(toReprocess::put);
}
log.info("Going to reprocess {} messages", toReprocess.size());
//TODO: 2.5 Log most popular rule nodes by error count;
log.info("[{}] Going to reprocess {} messages", queueName, toReprocess.size());
if (log.isTraceEnabled()) {
toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, msg.getValue()));
}
@ -116,9 +117,15 @@ public class TbRuleEngineProcessingStrategyFactory {
private static class SkipStrategy implements TbRuleEngineProcessingStrategy {
private final String queueName;
public SkipStrategy(String name) {
this.queueName = name;
}
@Override
public TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result) {
log.info("Skip reprocess of the rule engine pack");
log.info("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailureMap().size(), result.getPendingMap().size());
return new TbRuleEngineProcessingDecision(true, null);
}
}

View File

@ -19,7 +19,6 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.thingsboard.rule.engine.api.RpcError;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import java.util.Optional;
import java.util.UUID;

View File

@ -22,11 +22,8 @@ import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import java.util.Optional;
/**
* Created by ashvayka on 16.04.18.
*/

View File

@ -55,7 +55,7 @@ import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
@ -242,7 +242,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
}
@Override
public void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbMsgCallback callback) {
public void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbCallback callback) {
try {
TenantId tenantId = new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
DeviceId deviceId = new DeviceId(new UUID(proto.getDeviceIdMSB(), proto.getDeviceIdLSB()));

View File

@ -20,7 +20,7 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.queue.TbCallback;
/**
* Created by ashvayka on 01.05.18.
@ -41,6 +41,6 @@ public interface DeviceStateService extends ApplicationListener<PartitionChangeE
void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout);
void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto serverAddress, TbMsgCallback bytes);
void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto serverAddress, TbCallback bytes);
}

View File

@ -0,0 +1,127 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.stats;
import com.google.common.util.concurrent.FutureCallback;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@TbRuleEngineComponent
@Service
@Slf4j
public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsService {
public static final FutureCallback<Void> CALLBACK = new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
}
@Override
public void onFailure(Throwable t) {
log.warn("Failed to persist statistics", t);
}
};
private final TbServiceInfoProvider serviceInfoProvider;
private final TelemetrySubscriptionService tsService;
private final Lock lock = new ReentrantLock();
private final AssetService assetService;
private final ConcurrentMap<TenantQueueKey, AssetId> tenantQueueAssets;
public DefaultRuleEngineStatisticsService(TelemetrySubscriptionService tsService, TbServiceInfoProvider serviceInfoProvider, AssetService assetService) {
this.tsService = tsService;
this.serviceInfoProvider = serviceInfoProvider;
this.assetService = assetService;
this.tenantQueueAssets = new ConcurrentHashMap<>();
}
@Override
public void reportQueueStats(long ts, TbRuleEngineConsumerStats ruleEngineStats) {
String queueName = ruleEngineStats.getQueueName();
ruleEngineStats.getTenantStats().forEach((id, stats) -> {
TenantId tenantId = new TenantId(id);
AssetId serviceAssetId = getServiceAssetId(tenantId, queueName);
if (stats.getTotalMsgCounter().get() > 0) {
List<TsKvEntry> tsList = stats.getCounters().entrySet().stream()
.map(kv -> new BasicTsKvEntry(ts, new LongDataEntry(kv.getKey(), (long) kv.getValue().get())))
.collect(Collectors.toList());
if (!tsList.isEmpty()) {
tsService.saveAndNotify(tenantId, serviceAssetId, tsList, CALLBACK);
}
}
});
ruleEngineStats.getTenantExceptions().forEach((tenantId, e) -> {
TsKvEntry tsKv = new BasicTsKvEntry(ts, new JsonDataEntry("ruleEngineException", e.toJsonString()));
tsService.saveAndNotify(tenantId, getServiceAssetId(tenantId, queueName), Collections.singletonList(tsKv), CALLBACK);
});
ruleEngineStats.reset();
}
private AssetId getServiceAssetId(TenantId tenantId, String queueName) {
TenantQueueKey key = new TenantQueueKey(tenantId, queueName);
AssetId assetId = tenantQueueAssets.get(key);
if (assetId == null) {
lock.lock();
try {
assetId = tenantQueueAssets.get(key);
if (assetId == null) {
Asset asset = assetService.findAssetByTenantIdAndName(tenantId, queueName + "_" + serviceInfoProvider.getServiceId());
if (asset == null) {
asset = new Asset();
asset.setTenantId(tenantId);
asset.setName(queueName + "_" + serviceInfoProvider.getServiceId());
asset.setType("TbServiceQueue");
asset = assetService.saveAsset(asset);
}
assetId = asset.getId();
tenantQueueAssets.put(key, assetId);
}
} finally {
lock.unlock();
}
}
return assetId;
}
@Data
private static class TenantQueueKey {
private final TenantId tenantId;
private final String queueName;
}
}

View File

@ -13,11 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.cluster;
package org.thingsboard.server.service.stats;
/**
* Created by ashvayka on 23.09.18.
*/
public enum ServerType {
CORE
import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats;
public interface RuleEngineStatisticsService {
void reportQueueStats(long ts, TbRuleEngineConsumerStats stats);
}

View File

@ -34,7 +34,7 @@ import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
@ -123,7 +123,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
}
@Override
public void addSubscription(TbSubscription subscription, TbMsgCallback callback) {
public void addSubscription(TbSubscription subscription, TbCallback callback) {
log.trace("[{}][{}][{}] Registering remote subscription for entity [{}]",
subscription.getServiceId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId());
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId());
@ -151,7 +151,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
}
@Override
public void cancelSubscription(String sessionId, int subscriptionId, TbMsgCallback callback) {
public void cancelSubscription(String sessionId, int subscriptionId, TbCallback callback) {
log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId);
Map<Integer, TbSubscription> sessionSubscriptions = subscriptionsByWsSessionId.get(sessionId);
if (sessionSubscriptions != null) {
@ -189,7 +189,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
}
@Override
public void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, TbMsgCallback callback) {
public void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, TbCallback callback) {
onLocalSubUpdate(entityId,
s -> {
if (TbSubscriptionType.TIMESERIES.equals(s.getType())) {
@ -213,7 +213,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
}
@Override
public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbMsgCallback callback) {
public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback) {
onLocalSubUpdate(entityId,
s -> {
if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) {
@ -261,7 +261,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
if (subscriptionUpdate != null && !subscriptionUpdate.isEmpty()) {
if (serviceId.equals(s.getServiceId())) {
SubscriptionUpdate update = new SubscriptionUpdate(s.getSubscriptionId(), subscriptionUpdate);
localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbMsgCallback.EMPTY);
localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbCallback.EMPTY);
} else {
TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId());
toCoreNotificationsProducer.send(tpi, toProto(s, subscriptionUpdate), null);

View File

@ -35,7 +35,7 @@ import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
@ -135,7 +135,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
if (currentPartitions.contains(tpi)) {
// Subscription is managed on the same server;
if (pushToLocalService) {
subscriptionManagerService.addSubscription(subscription, TbMsgCallback.EMPTY);
subscriptionManagerService.addSubscription(subscription, TbCallback.EMPTY);
}
} else {
// Push to the queue;
@ -145,7 +145,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
}
@Override
public void onSubscriptionUpdate(String sessionId, SubscriptionUpdate update, TbMsgCallback callback) {
public void onSubscriptionUpdate(String sessionId, SubscriptionUpdate update, TbCallback callback) {
TbSubscription subscription = subscriptionsBySessionId
.getOrDefault(sessionId, Collections.emptyMap()).get(update.getSubscriptionId());
if (subscription != null) {
@ -177,7 +177,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId());
if (currentPartitions.contains(tpi)) {
// Subscription is managed on the same server;
subscriptionManagerService.cancelSubscription(sessionId, subscriptionId, TbMsgCallback.EMPTY);
subscriptionManagerService.cancelSubscription(sessionId, subscriptionId, TbCallback.EMPTY);
} else {
// Push to the queue;
TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toCloseSubscriptionProto(subscription);

View File

@ -21,18 +21,18 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.queue.TbCallback;
import java.util.List;
public interface SubscriptionManagerService extends ApplicationListener<PartitionChangeEvent> {
void addSubscription(TbSubscription subscription, TbMsgCallback callback);
void addSubscription(TbSubscription subscription, TbCallback callback);
void cancelSubscription(String sessionId, int subscriptionId, TbMsgCallback callback);
void cancelSubscription(String sessionId, int subscriptionId, TbCallback callback);
void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, TbMsgCallback callback);
void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, TbCallback callback);
void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbMsgCallback callback);
void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback);
}

View File

@ -17,7 +17,7 @@ package org.thingsboard.server.service.subscription;
import org.thingsboard.server.queue.discovery.ClusterTopologyChangeEvent;
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
public interface TbLocalSubscriptionService {
@ -28,7 +28,7 @@ public interface TbLocalSubscriptionService {
void cancelAllSessionSubscriptions(String sessionId);
void onSubscriptionUpdate(String sessionId, SubscriptionUpdate update, TbMsgCallback callback);
void onSubscriptionUpdate(String sessionId, SubscriptionUpdate update, TbCallback callback);
void onApplicationEvent(PartitionChangeEvent event);

View File

@ -41,7 +41,7 @@ import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.service.subscription.SubscriptionManagerService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
@ -166,7 +166,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
if (currentPartitions.contains(tpi)) {
if (subscriptionManagerService.isPresent()) {
subscriptionManagerService.get().onAttributesUpdate(tenantId, entityId, scope, attributes, TbMsgCallback.EMPTY);
subscriptionManagerService.get().onAttributesUpdate(tenantId, entityId, scope, attributes, TbCallback.EMPTY);
} else {
log.warn("Possible misconfiguration because subscriptionManagerService is null!");
}
@ -180,7 +180,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
if (currentPartitions.contains(tpi)) {
if (subscriptionManagerService.isPresent()) {
subscriptionManagerService.get().onTimeSeriesUpdate(tenantId, entityId, ts, TbMsgCallback.EMPTY);
subscriptionManagerService.get().onTimeSeriesUpdate(tenantId, entityId, ts, TbCallback.EMPTY);
} else {
log.warn("Possible misconfiguration because subscriptionManagerService is null!");
}

View File

@ -17,11 +17,7 @@ package org.thingsboard.server.service.telemetry;
import org.springframework.context.ApplicationListener;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
import org.thingsboard.server.service.telemetry.sub.SubscriptionState;
/**
* Created by ashvayka on 27.03.18.

View File

@ -1,80 +0,0 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.telemetry.sub;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.service.telemetry.TelemetryFeature;
import java.util.Map;
@Data
@AllArgsConstructor
public class Subscription {
private final SubscriptionState sub;
private final boolean local;
private ServerAddress server;
private long startTime;
private long endTime;
public Subscription(SubscriptionState sub, boolean local, ServerAddress server) {
this(sub, local, server, 0L, 0L);
}
public String getWsSessionId() {
return getSub().getWsSessionId();
}
public int getSubscriptionId() {
return getSub().getSubscriptionId();
}
public EntityId getEntityId() {
return getSub().getEntityId();
}
public TelemetryFeature getType() {
return getSub().getType();
}
public String getScope() {
return getSub().getScope();
}
public boolean isAllKeys() {
return getSub().isAllKeys();
}
public Map<String, Long> getKeyStates() {
return getSub().getKeyStates();
}
public void setKeyState(String key, long ts) {
getSub().getKeyStates().put(key, ts);
}
@Override
public String toString() {
return "Subscription{" +
"sub=" + sub +
", local=" + local +
", server=" + server +
'}';
}
}

View File

@ -22,7 +22,6 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.rule.engine.api.RuleChainTransactionService;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
@ -114,13 +113,6 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
// }
}
@Override
public void onRemoteTransactionMsg(ServerAddress serverAddress, byte[] data) {
endLocalTransaction(TbMsg.fromBytes(data, null), msg -> {
}, error -> {
});
}
private void addMsgToQueues(BlockingQueue<TbTransactionTask> queue, TbTransactionTask transactionTask) {
queue.offer(transactionTask);
timeoutQueue.offer(transactionTask);
@ -230,9 +222,4 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
callbackExecutor.executeAsync(task);
}
private void sendTransactionEventToRemoteServer(TbMsg msg, ServerAddress address) {
log.trace("[{}][{}] Originator is monitored on other server: {}", msg.getTransactionData().getOriginatorId(), msg.getTransactionData().getTransactionId(), address);
//TODO 2.5
// clusterRpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TRANSACTION_SERVICE_MESSAGE, TbMsg.toByteArray(msg));
}
}

View File

@ -23,7 +23,7 @@ import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.queue.TbCallback;
import java.io.Serializable;
import java.util.UUID;
@ -37,9 +37,9 @@ public class TransportToDeviceActorMsgWrapper implements TbActorMsg, DeviceAware
private final TenantId tenantId;
private final DeviceId deviceId;
private final TransportToDeviceActorMsg msg;
private final TbMsgCallback callback;
private final TbCallback callback;
public TransportToDeviceActorMsgWrapper(TransportToDeviceActorMsg msg, TbMsgCallback callback) {
public TransportToDeviceActorMsgWrapper(TransportToDeviceActorMsg msg, TbCallback callback) {
this.msg = msg;
this.callback = callback;
this.tenantId = new TenantId(new UUID(msg.getSessionInfo().getTenantIdMSB(), msg.getSessionInfo().getTenantIdLSB()));

View File

@ -27,8 +27,8 @@
<logger name="org.thingsboard.server" level="INFO" />
<logger name="akka" level="INFO" />
<logger name="org.thingsboard.server.service.queue" level="TRACE" />
<logger name="org.thingsboard.server.service.transport" level="TRACE" />
<!-- <logger name="org.thingsboard.server.service.queue" level="TRACE" />-->
<!-- <logger name="org.thingsboard.server.service.transport" level="TRACE" />-->
<root level="INFO">
<appender-ref ref="STDOUT"/>

View File

@ -561,7 +561,7 @@ queue:
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}"
stats:
enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:false}"
enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:true}"
print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}"
queues: # TODO 2.5: specify correct ENV variable names.
-
@ -577,7 +577,7 @@ queue:
failure-percentage: "${TB_QUEUE_RULE_ENGINE_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
pause-between-retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries;
-
name: "HighPriority"
name: "${TB_QUEUE_RULE_ENGINE_HP_QUEUE_NAME:HighPriority}"
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine.hp}"
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RULE_ENGINE_PARTITIONS:3}"
@ -594,7 +594,7 @@ queue:
poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
service:
type: "${TB_SERVICE_TYPE:monolith}" # monolith or tb-core or tb-rule-engine or tb-transport
type: "${TB_SERVICE_TYPE:monolith}" # monolith or tb-core or tb-rule-engine
# Unique id for this service (autogenerated if empty)
id: "${TB_SERVICE_ID:}"
tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.

View File

@ -35,8 +35,6 @@ import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.controller.AbstractRuleEngineControllerTest;
import org.thingsboard.server.dao.attributes.AttributesService;

View File

@ -39,8 +39,6 @@ import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.controller.AbstractRuleEngineControllerTest;
import org.thingsboard.server.dao.attributes.AttributesService;

View File

@ -33,11 +33,6 @@ public enum MsgType {
APP_INIT_MSG,
/**
* All messages, could be send to cluster
*/
SEND_TO_CLUSTER_MSG,
/**
* ADDED/UPDATED/DELETED events for main entities.
*

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -17,7 +17,6 @@ package org.thingsboard.server.common.msg;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@ -29,7 +28,6 @@ import org.thingsboard.server.common.msg.gen.MsgProtos;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.UUID;
/**

View File

@ -1,47 +0,0 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.cluster;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;
/**
* @author Andrew Shvayka
*/
@Data
@EqualsAndHashCode
public class ServerAddress implements Comparable<ServerAddress>, Serializable {
private final String host;
private final int port;
private final ServerType serverType;
@Override
public int compareTo(ServerAddress o) {
int result = this.host.compareTo(o.host);
if (result == 0) {
result = this.port - o.port;
}
return result;
}
@Override
public String toString() {
return '[' + host + ':' + port + ']';
}
}

View File

@ -33,6 +33,7 @@ public final class QueueToRuleEngineMsg implements TbActorMsg {
private final TenantId tenantId;
private final TbMsg tbMsg;
private final Set<String> relationTypes;
private final String failureMessage;
@Override
public MsgType getMsgType() {

View File

@ -0,0 +1,38 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.queue;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RuleEngineException extends Exception {
protected static final ObjectMapper mapper = new ObjectMapper();
public RuleEngineException(String message) {
super(message != null ? message : "Unknown");
}
public String toJsonString() {
try {
return mapper.writeValueAsString(mapper.createObjectNode().put("message", getMessage()));
} catch (JsonProcessingException e) {
log.warn("Failed to serialize exception ", e);
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,58 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.queue;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.rule.RuleNode;
@Slf4j
public class RuleNodeException extends RuleEngineException {
@Getter
private final String ruleChainName;
@Getter
private final String ruleNodeName;
@Getter
private final RuleChainId ruleChainId;
@Getter
private final RuleNodeId ruleNodeId;
public RuleNodeException(String message, String ruleChainName, RuleNode ruleNode) {
super(message);
this.ruleChainName = ruleChainName;
this.ruleNodeName = ruleNode.getName();
this.ruleChainId = ruleNode.getRuleChainId();
this.ruleNodeId = ruleNode.getId();
}
public String toJsonString() {
try {
return mapper.writeValueAsString(mapper.createObjectNode()
.put("ruleNodeId", ruleNodeId.toString())
.put("ruleChainId", ruleChainId.toString())
.put("ruleNodeName", ruleNodeName)
.put("ruleChainName", ruleChainName)
.put("message", getMessage()));
} catch (JsonProcessingException e) {
log.warn("Failed to serialize exception ", e);
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,37 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.queue;
public interface TbCallback {
TbCallback EMPTY = new TbCallback() {
@Override
public void onSuccess() {
}
@Override
public void onFailure(Throwable t) {
}
};
void onSuccess();
void onFailure(Throwable t);
}

View File

@ -25,13 +25,13 @@ public interface TbMsgCallback {
}
@Override
public void onFailure(Throwable t) {
public void onFailure(RuleEngineException e) {
}
};
void onSuccess();
void onFailure(Throwable t);
void onFailure(RuleEngineException e);
}

View File

@ -15,6 +15,8 @@
*/
package org.thingsboard.server.queue.common;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
@ -40,6 +42,6 @@ public class MultipleTbQueueTbMsgCallbackWrapper implements TbQueueCallback {
@Override
public void onFailure(Throwable t) {
tbMsgCallback.onFailure(t);
tbMsgCallback.onFailure(new RuleEngineException(t.getMessage()));
}
}

View File

@ -15,12 +15,12 @@
*/
package org.thingsboard.server.queue.common;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import java.util.concurrent.atomic.AtomicInteger;
public class TbQueueTbMsgCallbackWrapper implements TbQueueCallback {
private final TbMsgCallback tbMsgCallback;
@ -36,6 +36,6 @@ public class TbQueueTbMsgCallbackWrapper implements TbQueueCallback {
@Override
public void onFailure(Throwable t) {
tbMsgCallback.onFailure(t);
tbMsgCallback.onFailure(new RuleEngineException(t.getMessage()));
}
}

View File

@ -86,24 +86,6 @@ public class ConsistentHashPartitionService implements PartitionService {
partitionTopics.put(new ServiceQueue(ServiceType.TB_CORE), coreTopic);
}
// public Set<TopicPartitionInfo> getCurrentPartitions(ServiceType serviceType) {
// ServiceInfo currentService = serviceInfoProvider.getServiceInfo();
// TenantId tenantId = getSystemOrIsolatedTenantId(currentService);
// ServiceQueueKey serviceQueueKey = new ServiceQueueKey(serviceType, tenantId);
// List<Integer> partitions = myPartitions.get(serviceQueueKey);
// Set<TopicPartitionInfo> topicPartitions = new LinkedHashSet<>();
// for (Integer partition : partitions) {
// TopicPartitionInfo.TopicPartitionInfoBuilder tpi = TopicPartitionInfo.builder();
// tpi.topic(partitionTopics.get(serviceType));
// tpi.partition(partition);
// if (!tenantId.isNullUid()) {
// tpi.tenantId(tenantId);
// }
// topicPartitions.add(tpi.build());
// }
// return topicPartitions;
// }
@Override
public TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId) {
return resolve(new ServiceQueue(serviceType), tenantId, entityId);
@ -131,15 +113,7 @@ public class ConsistentHashPartitionService implements PartitionService {
Map<ServiceQueueKey, ConsistentHashCircle<ServiceInfo>> circles = new HashMap<>();
addNode(circles, currentService);
for (ServiceInfo other : otherServices) {
TenantId tenantId = getSystemOrIsolatedTenantId(other);
addNode(circles, other);
if (!tenantId.isNullUid()) {
isolatedTenants.putIfAbsent(tenantId, new HashSet<>());
for (String serviceType : other.getServiceTypesList()) {
isolatedTenants.get(tenantId).add(ServiceType.valueOf(serviceType.toUpperCase()));
}
}
}
ConcurrentMap<ServiceQueueKey, List<Integer>> oldPartitions = myPartitions;
TenantId myTenantId = getSystemOrIsolatedTenantId(currentService);
@ -214,6 +188,11 @@ public class ConsistentHashPartitionService implements PartitionService {
}
}
@Override
public Set<TenantId> getIsolatedTenants(ServiceType serviceType) {
throw new RuntimeException("Not Implemented!");
}
private Map<ServiceQueueKey, List<ServiceInfo>> getServiceKeyListMap(List<ServiceInfo> services) {
final Map<ServiceQueueKey, List<ServiceInfo>> currentMap = new HashMap<>();
services.forEach(serviceInfo -> {
@ -280,6 +259,12 @@ public class ConsistentHashPartitionService implements PartitionService {
private void addNode(Map<ServiceQueueKey, ConsistentHashCircle<ServiceInfo>> circles, ServiceInfo instance) {
TenantId tenantId = getSystemOrIsolatedTenantId(instance);
if (!tenantId.isNullUid()) {
isolatedTenants.putIfAbsent(tenantId, new HashSet<>());
for (String serviceType : instance.getServiceTypesList()) {
isolatedTenants.get(tenantId).add(ServiceType.valueOf(serviceType.toUpperCase()));
}
}
for (String serviceTypeStr : instance.getServiceTypesList()) {
ServiceType serviceType = ServiceType.valueOf(serviceTypeStr.toUpperCase());
if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {

View File

@ -34,6 +34,7 @@ import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
@ -58,6 +59,7 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
private List<ServiceType> serviceTypes;
private ServiceInfo serviceInfo;
private TenantId isolatedTenant;
@PostConstruct
public void init() {
@ -80,6 +82,7 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
UUID tenantId;
if (!StringUtils.isEmpty(tenantIdStr)) {
tenantId = UUID.fromString(tenantIdStr);
isolatedTenant = new TenantId(tenantId);
} else {
tenantId = TenantId.NULL_UUID;
}
@ -103,4 +106,14 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
public ServiceInfo getServiceInfo() {
return serviceInfo;
}
@Override
public boolean isService(ServiceType serviceType) {
return serviceTypes.contains(serviceType);
}
@Override
public Optional<TenantId> getIsolatedTenant() {
return Optional.ofNullable(isolatedTenant);
}
}

View File

@ -47,6 +47,8 @@ public interface PartitionService {
*/
Set<String> getAllServiceIds(ServiceType serviceType);
Set<TenantId> getIsolatedTenants(ServiceType serviceType);
/**
* Each Service should start a consumer for messages that target individual service instance based on serviceId.
* This topic is likely to have single partition, and is always assigned to the service.

View File

@ -15,12 +15,20 @@
*/
package org.thingsboard.server.queue.discovery;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo;
import java.util.Optional;
public interface TbServiceInfoProvider {
String getServiceId();
ServiceInfo getServiceInfo();
boolean isService(ServiceType serviceType);
Optional<TenantId> getIsolatedTenant();
}

View File

@ -17,7 +17,9 @@ package org.thingsboard.server.queue.settings;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@ -26,6 +28,7 @@ import java.util.List;
@Slf4j
@Data
@EnableAutoConfiguration
@Configuration
@ConfigurationProperties(prefix = "queue.rule-engine")
public class TbQueueRuleEngineSettings {

View File

@ -388,6 +388,7 @@ message ToRuleEngineMsg {
int64 tenantIdLSB = 2;
bytes tbMsg = 3;
repeated string relationTypes = 4;
string failureMessage = 5;
}
message ToRuleEngineNotificationMsg {

View File

@ -16,7 +16,6 @@
package org.thingsboard.rule.engine.api;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import java.util.function.Consumer;
@ -26,6 +25,4 @@ public interface RuleChainTransactionService {
void endTransaction(TbMsg msg, Consumer<TbMsg> onSuccess, Consumer<Throwable> onFailure);
void onRemoteTransactionMsg(ServerAddress serverAddress, byte[] bytes);
}

View File

@ -26,10 +26,8 @@ import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
@ -116,6 +114,8 @@ public interface TbContext {
*/
void enqueue(TbMsg msg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure);
void enqueueForTellFailure(TbMsg msg, String failureMessage);
void enqueueForTellNext(TbMsg msg, String relationType);
void enqueueForTellNext(TbMsg msg, Set<String> relationTypes);

View File

@ -0,0 +1,53 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.flow;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "acknowledge",
configClazz = EmptyNodeConfiguration.class,
nodeDescription = "Acknowledges the incoming message",
nodeDetails = "After acknowledgement, the message is pushed to related rule nodes. Useful if you don't care what happens to this message next.")
public class TbAckNode implements TbNode {
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
ctx.ack(msg);
ctx.tellSuccess(msg);
}
@Override
public void destroy() {
}
}

View File

@ -0,0 +1,57 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.flow;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.ScriptEngine;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import static org.thingsboard.common.util.DonAsynchron.withCallback;
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "checkpoint",
configClazz = TbCheckpointNodeConfiguration.class,
nodeDescription = "transfers the message to another queue",
nodeDetails = "After successful transfer incoming message is automatically acknowledged. Queue name is configurable.")
public class TbCheckpointNode implements TbNode {
private TbCheckpointNodeConfiguration config;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbCheckpointNodeConfiguration.class);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
ctx.enqueueForTellNext(msg, config.getQueueName(), TbRelationTypes.SUCCESS, () -> ctx.ack(msg), error -> ctx.tellFailure(msg, error));
}
@Override
public void destroy() {
}
}

View File

@ -13,28 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.cluster;
package org.thingsboard.rule.engine.flow;
import lombok.Data;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.rule.engine.api.NodeConfiguration;
@Data
public class SendToClusterMsg implements TbActorMsg {
private TbActorMsg msg;
private EntityId entityId;
public SendToClusterMsg(EntityId entityId, TbActorMsg msg) {
this.entityId = entityId;
this.msg = msg;
}
public class TbCheckpointNodeConfiguration implements NodeConfiguration<TbCheckpointNodeConfiguration> {
private String queueName;
@Override
public MsgType getMsgType() {
return MsgType.SEND_TO_CLUSTER_MSG;
public TbCheckpointNodeConfiguration defaultConfiguration() {
TbCheckpointNodeConfiguration configuration = new TbCheckpointNodeConfiguration();
configuration.setQueueName("HighPriority");
return configuration;
}
}

View File

@ -116,8 +116,7 @@ public class TbSendRPCRequestNode implements TbNode {
ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS);
} else {
TbMsg next = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name()));
ctx.enqueueForTellNext(next, TbRelationTypes.FAILURE);
ctx.enqueueForTellFailure(next, ruleEngineDeviceRpcResponse.getError().get().name());
}
});
ctx.ack(msg);

View File

@ -102,20 +102,44 @@ queue:
stats:
enabled: "${TB_QUEUE_CORE_STATS_ENABLED:false}"
print_interval_ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:10000}"
rule_engine:
rule-engine:
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine}"
poll_interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RULE_ENGINE_PARTITIONS:10}"
pack_processing_timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}"
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}"
stats:
enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:false}"
print_interval_ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}"
enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:true}"
print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}"
queues: # TODO 2.5: specify correct ENV variable names.
-
name: "Main"
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine.main}"
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RULE_ENGINE_PARTITIONS:10}"
pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}"
ack-strategy:
type: "${TB_QUEUE_RULE_ENGINE_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
# For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RULE_ENGINE_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
pause-between-retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries;
-
name: "HighPriority"
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine.hp}"
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RULE_ENGINE_PARTITIONS:3}"
pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}"
ack-strategy:
type: "${TB_QUEUE_RULE_ENGINE_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
# For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RULE_ENGINE_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
pause-between-retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRY_PAUSE:1}"# Time in seconds to wait in consumer thread before retries;
transport:
notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
service:
type: "${TB_SERVICE_TYPE:tb-transport}" # monolith or tb-core or tb-rule-engine or tb-transport
type: "${TB_SERVICE_TYPE:tb-transport}"
# Unique id for this service (autogenerated if empty)
id: "${TB_SERVICE_ID:}"
tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.