use FAILURE chain in Customer/Tenant/Related entity not found in Change Originator Node

This commit is contained in:
vparomskiy 2018-05-11 12:08:35 +03:00
parent 93272bed6f
commit 61baafe6fb
3 changed files with 38 additions and 22 deletions

View File

@ -1,12 +1,12 @@
/** /**
* Copyright © 2016-2018 The Thingsboard Authors * Copyright © 2016-2018 The Thingsboard Authors
* * <p>
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* * <p>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -18,10 +18,14 @@ package org.thingsboard.rule.engine.transform;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.TbNodeUtils; import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.*; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback; import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
/** /**
@ -40,7 +44,13 @@ public abstract class TbAbstractTransformNode implements TbNode {
@Override @Override
public void onMsg(TbContext ctx, TbMsg msg) { public void onMsg(TbContext ctx, TbMsg msg) {
withCallback(transform(ctx, msg), withCallback(transform(ctx, msg),
m -> ctx.tellNext(m, SUCCESS), m -> {
if (m != null) {
ctx.tellNext(m, SUCCESS);
} else {
ctx.tellNext(msg, FAILURE);
}
},
t -> ctx.tellError(msg, t)); t -> ctx.tellError(msg, t));
} }

View File

@ -1,12 +1,12 @@
/** /**
* Copyright © 2016-2018 The Thingsboard Authors * Copyright © 2016-2018 The Thingsboard Authors
* * <p>
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* * <p>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -21,7 +21,10 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.TbNodeUtils; import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.*; import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.util.EntitiesCustomerIdAsyncLoader; import org.thingsboard.rule.engine.util.EntitiesCustomerIdAsyncLoader;
import org.thingsboard.rule.engine.util.EntitiesRelatedEntityIdAsyncLoader; import org.thingsboard.rule.engine.util.EntitiesRelatedEntityIdAsyncLoader;
import org.thingsboard.rule.engine.util.EntitiesTenantIdAsyncLoader; import org.thingsboard.rule.engine.util.EntitiesTenantIdAsyncLoader;
@ -61,7 +64,12 @@ public class TbChangeOriginatorNode extends TbAbstractTransformNode {
@Override @Override
protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) { protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) {
ListenableFuture<? extends EntityId> newOriginator = getNewOriginator(ctx, msg.getOriginator()); ListenableFuture<? extends EntityId> newOriginator = getNewOriginator(ctx, msg.getOriginator());
return Futures.transform(newOriginator, (Function<EntityId, TbMsg>) n -> ctx.transformMsg(msg, msg.getType(), n, msg.getMetaData(), msg.getData())); return Futures.transform(newOriginator, (Function<EntityId, TbMsg>) n -> {
if (n == null || n.isNullUid()) {
return null;
}
return ctx.transformMsg(msg, msg.getType(), n, msg.getMetaData(), msg.getData());
});
} }
private ListenableFuture<? extends EntityId> getNewOriginator(TbContext ctx, EntityId original) { private ListenableFuture<? extends EntityId> getNewOriginator(TbContext ctx, EntityId original) {

View File

@ -40,6 +40,7 @@ import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class TbChangeOriginatorNodeTest { public class TbChangeOriginatorNodeTest {
@ -54,7 +55,7 @@ public class TbChangeOriginatorNodeTest {
@Test @Test
public void originatorCanBeChangedToCustomerId() throws TbNodeException { public void originatorCanBeChangedToCustomerId() throws TbNodeException {
init(false); init();
AssetId assetId = new AssetId(UUIDs.timeBased()); AssetId assetId = new AssetId(UUIDs.timeBased());
CustomerId customerId = new CustomerId(UUIDs.timeBased()); CustomerId customerId = new CustomerId(UUIDs.timeBased());
Asset asset = new Asset(); Asset asset = new Asset();
@ -82,7 +83,7 @@ public class TbChangeOriginatorNodeTest {
@Test @Test
public void newChainCanBeStarted() throws TbNodeException { public void newChainCanBeStarted() throws TbNodeException {
init(true); init();
AssetId assetId = new AssetId(UUIDs.timeBased()); AssetId assetId = new AssetId(UUIDs.timeBased());
CustomerId customerId = new CustomerId(UUIDs.timeBased()); CustomerId customerId = new CustomerId(UUIDs.timeBased());
Asset asset = new Asset(); Asset asset = new Asset();
@ -109,7 +110,7 @@ public class TbChangeOriginatorNodeTest {
@Test @Test
public void exceptionThrownIfCannotFindNewOriginator() throws TbNodeException { public void exceptionThrownIfCannotFindNewOriginator() throws TbNodeException {
init(true); init();
AssetId assetId = new AssetId(UUIDs.timeBased()); AssetId assetId = new AssetId(UUIDs.timeBased());
CustomerId customerId = new CustomerId(UUIDs.timeBased()); CustomerId customerId = new CustomerId(UUIDs.timeBased());
Asset asset = new Asset(); Asset asset = new Asset();
@ -121,16 +122,13 @@ public class TbChangeOriginatorNodeTest {
TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L); TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
when(ctx.getAssetService()).thenReturn(assetService); when(ctx.getAssetService()).thenReturn(assetService);
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFailedFuture(new IllegalStateException("wrong"))); when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(null));
node.onMsg(ctx, msg); node.onMsg(ctx, msg);
ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class); verify(ctx).tellNext(same(msg), same(FAILURE));
verify(ctx).tellError(same(msg), captor.capture());
Throwable value = captor.getValue();
assertEquals("wrong", value.getMessage());
} }
public void init(boolean startNewChain) throws TbNodeException { public void init() throws TbNodeException {
TbChangeOriginatorNodeConfiguration config = new TbChangeOriginatorNodeConfiguration(); TbChangeOriginatorNodeConfiguration config = new TbChangeOriginatorNodeConfiguration();
config.setOriginatorSource(TbChangeOriginatorNode.CUSTOMER_SOURCE); config.setOriginatorSource(TbChangeOriginatorNode.CUSTOMER_SOURCE);
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();