Refactoring of Message Routing
This commit is contained in:
parent
638ca0e1d1
commit
c52d0d26d3
@ -90,6 +90,11 @@ class DefaultTbContext implements TbContext {
|
||||
this.nodeCtx = nodeCtx;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tellSuccess(TbMsg msg) {
|
||||
tellNext(msg, Collections.singleton(TbRelationTypes.SUCCESS), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tellNext(TbMsg msg, String relationType) {
|
||||
tellNext(msg, Collections.singleton(relationType), null);
|
||||
|
||||
@ -59,6 +59,15 @@ public interface TbContext {
|
||||
*
|
||||
*/
|
||||
|
||||
/**
|
||||
* Indicates that message was successfully processed by the rule node.
|
||||
* Sends message to all Rule Nodes in the Rule Chain
|
||||
* that are connected to the current Rule Node using "Success" relationType.
|
||||
*
|
||||
* @param msg
|
||||
*/
|
||||
void tellSuccess(TbMsg msg);
|
||||
|
||||
/**
|
||||
* Sends message to all Rule Nodes in the Rule Chain
|
||||
* that are connected to the current Rule Node using specified relationType.
|
||||
|
||||
@ -68,6 +68,8 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura
|
||||
ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Updated");
|
||||
} else if (alarmResult.isCleared) {
|
||||
ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Cleared");
|
||||
} else {
|
||||
ctx.tellSuccess(msg);
|
||||
}
|
||||
},
|
||||
t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
|
||||
|
||||
@ -63,7 +63,7 @@ public abstract class TbAbstractCustomerActionNode<C extends TbAbstractCustomerA
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
withCallback(processCustomerAction(ctx, msg),
|
||||
m -> ctx.tellNext(msg, "Success"),
|
||||
m -> ctx.tellSuccess(msg),
|
||||
t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
|
||||
@ -89,7 +89,7 @@ public class TbCopyAttributesToEntityViewNode implements TbNode {
|
||||
if ((endTime != 0 && endTime > now && startTime < now) || (endTime == 0 && startTime < now)) {
|
||||
if (DataConstants.ATTRIBUTES_UPDATED.equals(msg.getType()) ||
|
||||
DataConstants.ACTIVITY_EVENT.equals(msg.getType()) ||
|
||||
SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msg.getType()) ) {
|
||||
SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msg.getType())) {
|
||||
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData()));
|
||||
List<AttributeKvEntry> filteredAttributes =
|
||||
attributes.stream().filter(attr -> attributeContainsInEntityView(scope, attr.getKey(), entityView)).collect(Collectors.toList());
|
||||
@ -117,13 +117,14 @@ public class TbCopyAttributesToEntityViewNode implements TbNode {
|
||||
}
|
||||
List<String> filteredAttributes =
|
||||
attributes.stream().filter(attr -> attributeContainsInEntityView(scope, attr, entityView)).collect(Collectors.toList());
|
||||
if (filteredAttributes != null && !filteredAttributes.isEmpty()) {
|
||||
if (!filteredAttributes.isEmpty()) {
|
||||
ctx.getAttributesService().removeAll(ctx.getTenantId(), entityView.getId(), scope, filteredAttributes);
|
||||
transformAndTellNext(ctx, msg, entityView);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ctx.ack(msg);
|
||||
},
|
||||
t -> ctx.tellFailure(msg, t));
|
||||
} else {
|
||||
@ -135,8 +136,7 @@ public class TbCopyAttributesToEntityViewNode implements TbNode {
|
||||
}
|
||||
|
||||
private void transformAndTellNext(TbContext ctx, TbMsg msg, EntityView entityView) {
|
||||
TbMsg updMsg = ctx.transformMsg(msg, msg.getType(), entityView.getId(), msg.getMetaData(), msg.getData());
|
||||
ctx.tellNext(updMsg, SUCCESS);
|
||||
ctx.enqueueForTellNext(ctx.newMsg(msg.getType(), entityView.getId(), msg.getMetaData(), msg.getData()), SUCCESS);
|
||||
}
|
||||
|
||||
private boolean attributeContainsInEntityView(String scope, String attrKey, EntityView entityView) {
|
||||
|
||||
@ -58,7 +58,7 @@ public class TbLogNode implements TbNode {
|
||||
toString -> {
|
||||
ctx.logJsEvalResponse();
|
||||
log.info(toString);
|
||||
ctx.tellNext(msg, SUCCESS);
|
||||
ctx.tellSuccess(msg);
|
||||
},
|
||||
t -> {
|
||||
ctx.logJsEvalResponse();
|
||||
|
||||
@ -77,7 +77,7 @@ public class TbMsgCountNode implements TbNode {
|
||||
|
||||
//TODO 2.5: Callback?
|
||||
TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), metaData, gson.toJson(telemetryJson));
|
||||
ctx.enqueueForTellNext(tbMsg, SUCCESS, null, null);
|
||||
ctx.enqueueForTellNext(tbMsg, SUCCESS);
|
||||
scheduleTickMsg(ctx);
|
||||
} else {
|
||||
messagesProcessed.incrementAndGet();
|
||||
|
||||
@ -105,10 +105,8 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
|
||||
withCallback(save(msg, ctx), aVoid -> {
|
||||
ctx.tellNext(msg, SUCCESS);
|
||||
}, e -> ctx.tellFailure(msg, e), ctx.getDbCallbackExecutor());
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
withCallback(save(msg, ctx), aVoid -> ctx.tellSuccess(msg), e -> ctx.tellFailure(msg, e), ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -74,11 +74,8 @@ public class TbSnsNode implements TbNode {
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
|
||||
withCallback(publishMessageAsync(ctx, msg),
|
||||
m -> ctx.tellNext(m, TbRelationTypes.SUCCESS),
|
||||
t -> {
|
||||
TbMsg next = processException(ctx, msg, t);
|
||||
ctx.tellFailure(next, t);
|
||||
});
|
||||
ctx::tellSuccess,
|
||||
t -> ctx.tellFailure(processException(ctx, msg, t), t));
|
||||
}
|
||||
|
||||
private ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) {
|
||||
|
||||
@ -80,13 +80,10 @@ public class TbSqsNode implements TbNode {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
withCallback(publishMessageAsync(ctx, msg),
|
||||
m -> ctx.tellNext(m, TbRelationTypes.SUCCESS),
|
||||
t -> {
|
||||
TbMsg next = processException(ctx, msg, t);
|
||||
ctx.tellFailure(next, t);
|
||||
});
|
||||
ctx::tellSuccess,
|
||||
t -> ctx.tellFailure(processException(ctx, msg, t), t));
|
||||
}
|
||||
|
||||
private ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) {
|
||||
|
||||
@ -97,7 +97,7 @@ public class TbMsgGeneratorNode implements TbNode {
|
||||
withCallback(generate(ctx),
|
||||
m -> {
|
||||
if (initialized && (config.getMsgCount() == TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT || currentMsgCount < config.getMsgCount())) {
|
||||
ctx.tellNext(m, SUCCESS);
|
||||
ctx.enqueueForTellNext(m, SUCCESS);
|
||||
scheduleTickMsg(ctx);
|
||||
currentMsgCount++;
|
||||
}
|
||||
|
||||
@ -65,13 +65,14 @@ public class TbMsgDelayNode implements TbNode {
|
||||
if (msg.getType().equals(TB_MSG_DELAY_NODE_MSG)) {
|
||||
TbMsg pendingMsg = pendingMsgs.remove(UUID.fromString(msg.getData()));
|
||||
if (pendingMsg != null) {
|
||||
ctx.tellNext(pendingMsg, SUCCESS);
|
||||
ctx.enqueueForTellNext(pendingMsg, SUCCESS);
|
||||
}
|
||||
} else {
|
||||
if(pendingMsgs.size() < config.getMaxPendingMsgs()) {
|
||||
if (pendingMsgs.size() < config.getMaxPendingMsgs()) {
|
||||
pendingMsgs.put(msg.getId(), msg);
|
||||
TbMsg tickMsg = ctx.newMsg(TB_MSG_DELAY_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), msg.getId().toString());
|
||||
ctx.tellSelf(tickMsg, getDelay(msg));
|
||||
ctx.ack(msg);
|
||||
} else {
|
||||
ctx.tellFailure(msg, new RuntimeException("Max limit of pending messages reached!"));
|
||||
}
|
||||
|
||||
@ -76,14 +76,13 @@ public class TbCheckAlarmStatusNode implements TbNode {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (isPresent) {
|
||||
ctx.tellNext(msg, "True");
|
||||
} else {
|
||||
ctx.tellNext(msg, "False");
|
||||
}
|
||||
} else {
|
||||
ctx.tellFailure(msg, new TbNodeException("No such Alarm found."));
|
||||
ctx.tellFailure(msg, new TbNodeException("No such alarm found."));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -44,7 +44,7 @@ public class TbMsgTypeFilterNode implements TbNode {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
ctx.tellNext(msg, config.getMessageTypes().contains(msg.getType()) ? "True" : "False");
|
||||
}
|
||||
|
||||
|
||||
@ -45,7 +45,7 @@ public class TbMsgTypeSwitchNode implements TbNode {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
String relationType;
|
||||
if (msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name())) {
|
||||
relationType = "Post attributes";
|
||||
|
||||
@ -42,7 +42,7 @@ public class TbOriginatorTypeFilterNode implements TbNode {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
EntityType originatorType = msg.getOriginator().getEntityType();
|
||||
ctx.tellNext(msg, config.getOriginatorTypes().contains(originatorType) ? "True" : "False");
|
||||
}
|
||||
|
||||
@ -99,7 +99,7 @@ public class TbPubSubNode implements TbNode {
|
||||
ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {
|
||||
public void onSuccess(String messageId) {
|
||||
TbMsg next = processPublishResult(ctx, msg, messageId);
|
||||
ctx.tellNext(next, TbRelationTypes.SUCCESS);
|
||||
ctx.tellSuccess(next);
|
||||
}
|
||||
|
||||
public void onFailure(Throwable t) {
|
||||
|
||||
@ -47,11 +47,12 @@ import java.util.concurrent.TimeoutException;
|
||||
type = ComponentType.ACTION,
|
||||
name = "gps geofencing events",
|
||||
configClazz = TbGpsGeofencingActionNodeConfiguration.class,
|
||||
relationTypes = {"Entered", "Left", "Inside", "Outside"},
|
||||
relationTypes = {"Success", "Entered", "Left", "Inside", "Outside"},
|
||||
nodeDescription = "Produces incoming messages using GPS based geofencing",
|
||||
nodeDetails = "Extracts latitude and longitude parameters from incoming message and returns different events based on configuration parameters",
|
||||
uiResources = {"static/rulenode/rulenode-core-config.js"},
|
||||
configDirective = "tbActionNodeGpsGeofencingConfig")
|
||||
configDirective = "tbActionNodeGpsGeofencingConfig"
|
||||
)
|
||||
public class TbGpsGeofencingActionNode extends AbstractGeofencingNode<TbGpsGeofencingActionNodeConfiguration> {
|
||||
|
||||
private final Map<EntityId, EntityGeofencingState> entityStates = new HashMap<>();
|
||||
@ -78,17 +79,26 @@ public class TbGpsGeofencingActionNode extends AbstractGeofencingNode<TbGpsGeofe
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
boolean told = false;
|
||||
if (entityState.getStateSwitchTime() == 0L || entityState.isInside() != matches) {
|
||||
switchState(ctx, msg.getOriginator(), entityState, matches, ts);
|
||||
ctx.tellNext(msg, matches ? "Entered" : "Left");
|
||||
} else if (!entityState.isStayed()) {
|
||||
long stayTime = ts - entityState.getStateSwitchTime();
|
||||
if (stayTime > (entityState.isInside() ?
|
||||
TimeUnit.valueOf(config.getMinInsideDurationTimeUnit()).toMillis(config.getMinInsideDuration()) : TimeUnit.valueOf(config.getMinOutsideDurationTimeUnit()).toMillis(config.getMinOutsideDuration()))) {
|
||||
setStaid(ctx, msg.getOriginator(), entityState);
|
||||
ctx.tellNext(msg, entityState.isInside() ? "Inside" : "Outside");
|
||||
told = true;
|
||||
} else {
|
||||
if (!entityState.isStayed()) {
|
||||
long stayTime = ts - entityState.getStateSwitchTime();
|
||||
if (stayTime > (entityState.isInside() ?
|
||||
TimeUnit.valueOf(config.getMinInsideDurationTimeUnit()).toMillis(config.getMinInsideDuration()) : TimeUnit.valueOf(config.getMinOutsideDurationTimeUnit()).toMillis(config.getMinOutsideDuration()))) {
|
||||
setStaid(ctx, msg.getOriginator(), entityState);
|
||||
ctx.tellNext(msg, entityState.isInside() ? "Inside" : "Outside");
|
||||
told = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!told) {
|
||||
ctx.tellSuccess(msg);
|
||||
}
|
||||
}
|
||||
|
||||
private void switchState(TbContext ctx, EntityId entityId, EntityGeofencingState entityState, boolean matches, long ts) {
|
||||
|
||||
@ -64,8 +64,7 @@ public class TbKafkaNode implements TbNode {
|
||||
properties.put(ProducerConfig.LINGER_MS_CONFIG, config.getLinger());
|
||||
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.getBufferMemory());
|
||||
if (config.getOtherProperties() != null) {
|
||||
config.getOtherProperties()
|
||||
.forEach((k,v) -> properties.put(k, v));
|
||||
config.getOtherProperties().forEach(properties::put);
|
||||
}
|
||||
try {
|
||||
this.producer = new KafkaProducer<>(properties);
|
||||
@ -75,7 +74,7 @@ public class TbKafkaNode implements TbNode {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg.getMetaData());
|
||||
try {
|
||||
producer.send(new ProducerRecord<>(topic, msg.getData()),
|
||||
|
||||
@ -34,7 +34,6 @@ import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.thingsboard.common.util.DonAsynchron.withCallback;
|
||||
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
|
||||
|
||||
@Slf4j
|
||||
@RuleNode(
|
||||
@ -79,7 +78,7 @@ public class TbSendEmailNode implements TbNode {
|
||||
sendEmail(ctx, email);
|
||||
return null;
|
||||
}),
|
||||
ok -> ctx.tellNext(msg, SUCCESS),
|
||||
ok -> ctx.tellSuccess(msg),
|
||||
fail -> ctx.tellFailure(msg, fail));
|
||||
} catch (Exception ex) {
|
||||
ctx.tellFailure(msg, ex);
|
||||
|
||||
@ -42,7 +42,6 @@ import java.util.stream.Collectors;
|
||||
|
||||
import static org.thingsboard.common.util.DonAsynchron.withCallback;
|
||||
import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
|
||||
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
|
||||
import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE;
|
||||
import static org.thingsboard.server.common.data.DataConstants.LATEST_TS;
|
||||
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
|
||||
@ -100,7 +99,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
|
||||
if (!failuresMap.isEmpty()) {
|
||||
throw reportFailures(failuresMap);
|
||||
}
|
||||
ctx.tellNext(msg, SUCCESS);
|
||||
ctx.tellSuccess(msg);
|
||||
}, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
|
||||
@ -57,7 +57,7 @@ public abstract class TbAbstractGetEntityDetailsNode<C extends TbAbstractGetEnti
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
withCallback(getDetails(ctx, msg),
|
||||
m -> ctx.tellNext(m, SUCCESS),
|
||||
ctx::tellSuccess,
|
||||
t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
|
||||
@ -50,8 +50,7 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
try {
|
||||
withCallback(
|
||||
findEntityAsync(ctx, msg.getOriginator()),
|
||||
withCallback(findEntityAsync(ctx, msg.getOriginator()),
|
||||
entityId -> safeGetAttributes(ctx, msg, entityId),
|
||||
t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
|
||||
} catch (Throwable th) {
|
||||
@ -88,7 +87,7 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
|
||||
String attrName = config.getAttrMapping().get(r.getKey());
|
||||
msg.getMetaData().putValue(attrName, r.getValueAsString());
|
||||
});
|
||||
ctx.tellNext(msg, SUCCESS);
|
||||
ctx.tellSuccess(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -49,10 +49,10 @@ public class TbGetOriginatorFieldsNode implements TbNode {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
try {
|
||||
withCallback(putEntityFields(ctx, msg.getOriginator(), msg),
|
||||
i -> ctx.tellNext(msg, SUCCESS), t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
|
||||
i -> ctx.tellSuccess(msg), t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
|
||||
} catch (Throwable th) {
|
||||
ctx.tellFailure(msg, th);
|
||||
}
|
||||
|
||||
@ -106,8 +106,7 @@ public class TbGetTelemetryNode implements TbNode {
|
||||
ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(msg));
|
||||
DonAsynchron.withCallback(list, data -> {
|
||||
process(data, msg);
|
||||
TbMsg newMsg = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData());
|
||||
ctx.tellNext(newMsg, SUCCESS);
|
||||
ctx.tellSuccess(ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData()));
|
||||
}, error -> ctx.tellFailure(msg, error), ctx.getDbCallbackExecutor());
|
||||
} catch (Exception e) {
|
||||
ctx.tellFailure(msg, e);
|
||||
|
||||
@ -72,12 +72,12 @@ public class TbMqttNode implements TbNode {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
String topic = TbNodeUtils.processPattern(this.config.getTopicPattern(), msg.getMetaData());
|
||||
this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE)
|
||||
.addListener(future -> {
|
||||
if (future.isSuccess()) {
|
||||
ctx.tellNext(msg, TbRelationTypes.SUCCESS);
|
||||
ctx.tellSuccess(msg);
|
||||
} else {
|
||||
TbMsg next = processException(ctx, msg, future.cause());
|
||||
ctx.tellFailure(next, future.cause());
|
||||
|
||||
@ -74,9 +74,9 @@ public class TbRabbitMqNode implements TbNode {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
withCallback(publishMessageAsync(ctx, msg),
|
||||
m -> ctx.tellNext(m, TbRelationTypes.SUCCESS),
|
||||
ctx::tellSuccess,
|
||||
t -> {
|
||||
TbMsg next = processException(ctx, msg, t);
|
||||
ctx.tellFailure(next, t);
|
||||
|
||||
@ -113,7 +113,7 @@ class TbHttpClient {
|
||||
queueProcessor.resetCounter();
|
||||
}
|
||||
TbMsg next = processResponse(ctx, msg, responseEntity);
|
||||
ctx.tellNext(next, TbRelationTypes.SUCCESS);
|
||||
ctx.tellSuccess(next);
|
||||
} else {
|
||||
if (config.isUseRedisQueueForMsgPersistence()) {
|
||||
processHttpClientError(responseEntity.getStatusCode(), msg, queueProcessor);
|
||||
|
||||
@ -59,6 +59,7 @@ public class TbSendRPCReplyNode implements TbNode {
|
||||
ctx.tellFailure(msg, new RuntimeException("Request body is empty!"));
|
||||
} else {
|
||||
ctx.getRpcService().sendRpcReplyToDevice(new DeviceId(msg.getOriginator().getId()), Integer.parseInt(requestIdStr), msg.getData());
|
||||
ctx.tellSuccess(msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -112,13 +112,15 @@ public class TbSendRPCRequestNode implements TbNode {
|
||||
|
||||
ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> {
|
||||
if (!ruleEngineDeviceRpcResponse.getError().isPresent()) {
|
||||
TbMsg next = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
|
||||
ctx.tellNext(next, TbRelationTypes.SUCCESS);
|
||||
TbMsg next = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
|
||||
ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS);
|
||||
} else {
|
||||
TbMsg next = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
|
||||
TbMsg next = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
|
||||
ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name()));
|
||||
ctx.enqueueForTellNext(next, TbRelationTypes.FAILURE);
|
||||
}
|
||||
});
|
||||
ctx.ack(msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -61,7 +61,6 @@ public class TbMsgAttributesNode implements TbNode {
|
||||
ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
|
||||
return;
|
||||
}
|
||||
|
||||
String src = msg.getData();
|
||||
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src));
|
||||
ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), config.getScope(), new ArrayList<>(attributes), new TelemetryNodeCallback(ctx, msg));
|
||||
|
||||
@ -74,7 +74,7 @@ public class TbMsgTimeseriesNode implements TbNode {
|
||||
}
|
||||
String src = msg.getData();
|
||||
Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts);
|
||||
if (tsKvMap == null) {
|
||||
if (tsKvMap.isEmpty()) {
|
||||
ctx.tellFailure(msg, new IllegalArgumentException("Msg body is empty: " + src));
|
||||
return;
|
||||
}
|
||||
@ -85,7 +85,7 @@ public class TbMsgTimeseriesNode implements TbNode {
|
||||
}
|
||||
}
|
||||
String ttlValue = msg.getMetaData().getValue("TTL");
|
||||
long ttl = !StringUtils.isEmpty(ttlValue) ? Long.valueOf(ttlValue) : config.getDefaultTTL();
|
||||
long ttl = !StringUtils.isEmpty(ttlValue) ? Long.parseLong(ttlValue) : config.getDefaultTTL();
|
||||
ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg));
|
||||
}
|
||||
|
||||
|
||||
@ -22,8 +22,6 @@ import org.thingsboard.server.common.msg.TbMsg;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 02.04.18.
|
||||
*/
|
||||
@ -34,7 +32,7 @@ class TelemetryNodeCallback implements FutureCallback<Void> {
|
||||
|
||||
@Override
|
||||
public void onSuccess(@Nullable Void result) {
|
||||
ctx.tellNext(msg, SUCCESS);
|
||||
ctx.tellSuccess(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -55,7 +55,7 @@ public abstract class TbAbstractTransformNode implements TbNode {
|
||||
|
||||
protected void transformSuccess(TbContext ctx, TbMsg msg, TbMsg m) {
|
||||
if (m != null) {
|
||||
ctx.tellNext(m, SUCCESS);
|
||||
ctx.tellSuccess(m);
|
||||
} else {
|
||||
ctx.tellNext(msg, FAILURE);
|
||||
}
|
||||
|
||||
@ -26,18 +26,19 @@ import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.thingsboard.common.util.ListeningExecutor;
|
||||
import org.thingsboard.rule.engine.api.*;
|
||||
import org.thingsboard.rule.engine.api.ScriptEngine;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.RuleNodeId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.DataType;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgDataType;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
@ -47,13 +48,26 @@ import javax.script.ScriptException;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.thingsboard.rule.engine.action.TbAbstractAlarmNode.*;
|
||||
import static org.mockito.Mockito.anyLong;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.eq;
|
||||
import static org.mockito.Mockito.same;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.thingsboard.rule.engine.action.TbAbstractAlarmNode.IS_CLEARED_ALARM;
|
||||
import static org.thingsboard.rule.engine.action.TbAbstractAlarmNode.IS_EXISTING_ALARM;
|
||||
import static org.thingsboard.rule.engine.action.TbAbstractAlarmNode.IS_NEW_ALARM;
|
||||
import static org.thingsboard.server.common.data.alarm.AlarmSeverity.CRITICAL;
|
||||
import static org.thingsboard.server.common.data.alarm.AlarmSeverity.WARNING;
|
||||
import static org.thingsboard.server.common.data.alarm.AlarmStatus.*;
|
||||
import static org.thingsboard.server.common.data.alarm.AlarmStatus.ACTIVE_UNACK;
|
||||
import static org.thingsboard.server.common.data.alarm.AlarmStatus.CLEARED_ACK;
|
||||
import static org.thingsboard.server.common.data.alarm.AlarmStatus.CLEARED_UNACK;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class TbAlarmNodeTest {
|
||||
|
||||
@ -19,7 +19,6 @@ import com.datastax.driver.core.utils.UUIDs;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
@ -28,7 +27,10 @@ import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.thingsboard.common.util.ListeningExecutor;
|
||||
import org.thingsboard.rule.engine.api.*;
|
||||
import org.thingsboard.rule.engine.api.ScriptEngine;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.RuleNodeId;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
@ -39,7 +41,10 @@ import javax.script.ScriptException;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.same;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class TbJsFilterNodeTest {
|
||||
|
||||
@ -28,7 +28,10 @@ import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.thingsboard.common.util.ListeningExecutor;
|
||||
import org.thingsboard.rule.engine.api.*;
|
||||
import org.thingsboard.rule.engine.api.ScriptEngine;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.RuleNodeId;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
@ -41,7 +44,9 @@ import java.util.concurrent.Callable;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Matchers.same;
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class TbJsSwitchNodeTest {
|
||||
|
||||
@ -37,7 +37,6 @@ import java.io.IOException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
|
||||
@ -31,8 +31,17 @@ import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.User;
|
||||
import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.id.*;
|
||||
import org.thingsboard.server.common.data.kv.*;
|
||||
import org.thingsboard.server.common.data.id.AssetId;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.RuleNodeId;
|
||||
import org.thingsboard.server.common.data.id.UserId;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.StringDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgDataType;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
|
||||
@ -27,7 +27,10 @@ import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.thingsboard.common.util.ListeningExecutor;
|
||||
import org.thingsboard.rule.engine.api.*;
|
||||
import org.thingsboard.rule.engine.api.ScriptEngine;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.RuleNodeId;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
@ -39,7 +42,10 @@ import java.util.concurrent.Callable;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Matchers.same;
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.eq;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user