use FAILURE chain in Customer/Tenant/Related entity not found
fetch telemetry and attributes in get node
This commit is contained in:
parent
81d38c567a
commit
93272bed6f
@ -1,12 +1,12 @@
|
|||||||
/**
|
/**
|
||||||
* Copyright © 2016-2018 The Thingsboard Authors
|
* Copyright © 2016-2018 The Thingsboard Authors
|
||||||
*
|
* <p>
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
* You may obtain a copy of the License at
|
* You may obtain a copy of the License at
|
||||||
*
|
* <p>
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
* <p>
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
@ -18,8 +18,12 @@ package org.thingsboard.rule.engine.metadata;
|
|||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.thingsboard.rule.engine.TbNodeUtils;
|
import org.thingsboard.rule.engine.TbNodeUtils;
|
||||||
import org.thingsboard.rule.engine.api.*;
|
import org.thingsboard.rule.engine.api.TbContext;
|
||||||
|
import org.thingsboard.rule.engine.api.TbNode;
|
||||||
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||||
|
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||||
@ -30,9 +34,11 @@ import java.util.List;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
|
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
|
||||||
|
import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
|
||||||
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
|
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
|
||||||
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
|
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode {
|
public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode {
|
||||||
|
|
||||||
private TbGetEntityAttrNodeConfiguration config;
|
private TbGetEntityAttrNodeConfiguration config;
|
||||||
@ -47,17 +53,24 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
|
|||||||
try {
|
try {
|
||||||
withCallback(
|
withCallback(
|
||||||
findEntityAsync(ctx, msg.getOriginator()),
|
findEntityAsync(ctx, msg.getOriginator()),
|
||||||
entityId -> withCallback(
|
entityId -> safeGetAttributes(ctx, msg, entityId),
|
||||||
config.isTelemetry() ? getLatestTelemetry(ctx, entityId) : getAttributesAsync(ctx, entityId),
|
|
||||||
attributes -> putAttributesAndTell(ctx, msg, attributes),
|
|
||||||
t -> ctx.tellError(msg, t)
|
|
||||||
),
|
|
||||||
t -> ctx.tellError(msg, t));
|
t -> ctx.tellError(msg, t));
|
||||||
} catch (Throwable th) {
|
} catch (Throwable th) {
|
||||||
ctx.tellError(msg, th);
|
ctx.tellError(msg, th);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void safeGetAttributes(TbContext ctx, TbMsg msg, T entityId) {
|
||||||
|
if(entityId == null || entityId.isNullUid()) {
|
||||||
|
ctx.tellNext(msg, FAILURE);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
withCallback(config.isTelemetry() ? getLatestTelemetry(ctx, entityId) : getAttributesAsync(ctx, entityId),
|
||||||
|
attributes -> putAttributesAndTell(ctx, msg, attributes),
|
||||||
|
t -> ctx.tellError(msg, t));
|
||||||
|
}
|
||||||
|
|
||||||
private ListenableFuture<List<KvEntry>> getAttributesAsync(TbContext ctx, EntityId entityId) {
|
private ListenableFuture<List<KvEntry>> getAttributesAsync(TbContext ctx, EntityId entityId) {
|
||||||
ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, SERVER_SCOPE, config.getAttrMapping().keySet());
|
ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, SERVER_SCOPE, config.getAttrMapping().keySet());
|
||||||
return Futures.transform(latest, (Function<? super List<AttributeKvEntry>, ? extends List<KvEntry>>) l ->
|
return Futures.transform(latest, (Function<? super List<AttributeKvEntry>, ? extends List<KvEntry>>) l ->
|
||||||
|
|||||||
@ -58,23 +58,17 @@ public class TbGetAttributesNode implements TbNode {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
|
public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
|
||||||
// todo-vp: both telemetry and attributes should be processes
|
ListenableFuture<List<Void>> allFutures = Futures.allAsList(
|
||||||
if (CollectionUtils.isNotEmpty(config.getLatestTsKeyNames())) {
|
putLatestTelemetry(ctx, msg, config.getLatestTsKeyNames()),
|
||||||
withCallback(getLatestTelemetry(ctx, msg, config.getLatestTsKeyNames()),
|
|
||||||
i -> ctx.tellNext(msg, SUCCESS),
|
|
||||||
t -> ctx.tellError(msg, t));
|
|
||||||
} else {
|
|
||||||
ListenableFuture<List<Void>> future = Futures.allAsList(
|
|
||||||
putAttrAsync(ctx, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs_"),
|
putAttrAsync(ctx, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs_"),
|
||||||
putAttrAsync(ctx, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared_"),
|
putAttrAsync(ctx, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared_"),
|
||||||
putAttrAsync(ctx, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss_"));
|
putAttrAsync(ctx, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss_")
|
||||||
|
);
|
||||||
withCallback(future, i -> ctx.tellNext(msg, SUCCESS), t -> ctx.tellError(msg, t));
|
withCallback(allFutures, i -> ctx.tellNext(msg, SUCCESS), t -> ctx.tellError(msg, t));
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<Void> putAttrAsync(TbContext ctx, TbMsg msg, String scope, List<String> keys, String prefix) {
|
private ListenableFuture<Void> putAttrAsync(TbContext ctx, TbMsg msg, String scope, List<String> keys, String prefix) {
|
||||||
if (keys == null) {
|
if (CollectionUtils.isEmpty(keys)) {
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(msg.getOriginator(), scope, keys);
|
ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(msg.getOriginator(), scope, keys);
|
||||||
@ -84,8 +78,8 @@ public class TbGetAttributesNode implements TbNode {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<Void> getLatestTelemetry(TbContext ctx, TbMsg msg, List<String> keys) {
|
private ListenableFuture<Void> putLatestTelemetry(TbContext ctx, TbMsg msg, List<String> keys) {
|
||||||
if (keys == null) {
|
if (CollectionUtils.isEmpty(keys)) {
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(msg.getOriginator(), keys);
|
ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(msg.getOriginator(), keys);
|
||||||
|
|||||||
@ -45,6 +45,6 @@ public class EntitiesCustomerIdAsyncLoader {
|
|||||||
private static <T extends HasCustomerId> ListenableFuture<CustomerId> getCustomerAsync(ListenableFuture<T> future) {
|
private static <T extends HasCustomerId> ListenableFuture<CustomerId> getCustomerAsync(ListenableFuture<T> future) {
|
||||||
return Futures.transform(future, (AsyncFunction<HasCustomerId, CustomerId>) in -> {
|
return Futures.transform(future, (AsyncFunction<HasCustomerId, CustomerId>) in -> {
|
||||||
return in != null ? Futures.immediateFuture(in.getCustomerId())
|
return in != null ? Futures.immediateFuture(in.getCustomerId())
|
||||||
: Futures.immediateFailedFuture(new IllegalStateException("Customer not found"));});
|
: Futures.immediateFuture(null);});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -40,11 +40,11 @@ public class EntitiesRelatedEntityIdAsyncLoader {
|
|||||||
if (relationsQuery.getDirection() == EntitySearchDirection.FROM) {
|
if (relationsQuery.getDirection() == EntitySearchDirection.FROM) {
|
||||||
return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
|
return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
|
||||||
r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo())
|
r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo())
|
||||||
: Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
|
: Futures.immediateFuture(null));
|
||||||
} else if (relationsQuery.getDirection() == EntitySearchDirection.TO) {
|
} else if (relationsQuery.getDirection() == EntitySearchDirection.TO) {
|
||||||
return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
|
return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
|
||||||
r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom())
|
r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom())
|
||||||
: Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
|
: Futures.immediateFuture(null));
|
||||||
}
|
}
|
||||||
return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction"));
|
return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction"));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -53,6 +53,6 @@ public class EntitiesTenantIdAsyncLoader {
|
|||||||
private static <T extends HasTenantId> ListenableFuture<TenantId> getTenantAsync(ListenableFuture<T> future) {
|
private static <T extends HasTenantId> ListenableFuture<TenantId> getTenantAsync(ListenableFuture<T> future) {
|
||||||
return Futures.transform(future, (AsyncFunction<HasTenantId, TenantId>) in -> {
|
return Futures.transform(future, (AsyncFunction<HasTenantId, TenantId>) in -> {
|
||||||
return in != null ? Futures.immediateFuture(in.getTenantId())
|
return in != null ? Futures.immediateFuture(in.getTenantId())
|
||||||
: Futures.immediateFailedFuture(new IllegalStateException("Tenant not found"));});
|
: Futures.immediateFuture(null);});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,12 +1,12 @@
|
|||||||
/**
|
/**
|
||||||
* Copyright © 2016-2018 The Thingsboard Authors
|
* Copyright © 2016-2018 The Thingsboard Authors
|
||||||
*
|
* <p>
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
* You may obtain a copy of the License at
|
* You may obtain a copy of the License at
|
||||||
*
|
* <p>
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
* <p>
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
@ -31,12 +31,7 @@ import org.thingsboard.rule.engine.api.TbNodeException;
|
|||||||
import org.thingsboard.server.common.data.Device;
|
import org.thingsboard.server.common.data.Device;
|
||||||
import org.thingsboard.server.common.data.User;
|
import org.thingsboard.server.common.data.User;
|
||||||
import org.thingsboard.server.common.data.asset.Asset;
|
import org.thingsboard.server.common.data.asset.Asset;
|
||||||
import org.thingsboard.server.common.data.id.AssetId;
|
import org.thingsboard.server.common.data.id.*;
|
||||||
import org.thingsboard.server.common.data.id.CustomerId;
|
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
|
||||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
|
||||||
import org.thingsboard.server.common.data.id.RuleNodeId;
|
|
||||||
import org.thingsboard.server.common.data.id.UserId;
|
|
||||||
import org.thingsboard.server.common.data.kv.*;
|
import org.thingsboard.server.common.data.kv.*;
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
@ -56,6 +51,7 @@ import static org.junit.Assert.assertTrue;
|
|||||||
import static org.mockito.Matchers.same;
|
import static org.mockito.Matchers.same;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
|
||||||
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
|
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
|
||||||
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
|
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
|
||||||
|
|
||||||
@ -149,7 +145,7 @@ public class TbGetCustomerAttributeNodeTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void errorThrownIfCustomerCannotBeFound() {
|
public void failedChainUsedIfCustomerCannotBeFound() {
|
||||||
UserId userId = new UserId(UUIDs.timeBased());
|
UserId userId = new UserId(UUIDs.timeBased());
|
||||||
CustomerId customerId = new CustomerId(UUIDs.timeBased());
|
CustomerId customerId = new CustomerId(UUIDs.timeBased());
|
||||||
User user = new User();
|
User user = new User();
|
||||||
@ -160,13 +156,9 @@ public class TbGetCustomerAttributeNodeTest {
|
|||||||
when(ctx.getUserService()).thenReturn(userService);
|
when(ctx.getUserService()).thenReturn(userService);
|
||||||
when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(null));
|
when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(null));
|
||||||
|
|
||||||
node.onMsg(ctx, msg);
|
|
||||||
final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
|
|
||||||
verify(ctx).tellError(same(msg), captor.capture());
|
|
||||||
|
|
||||||
Throwable value = captor.getValue();
|
node.onMsg(ctx, msg);
|
||||||
assertEquals(IllegalStateException.class, value.getClass());
|
verify(ctx).tellNext(msg, FAILURE);
|
||||||
assertEquals("Customer not found", value.getMessage());
|
|
||||||
assertTrue(msg.getMetaData().getData().isEmpty());
|
assertTrue(msg.getMetaData().getData().isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user