refactoring and fix bugs
This commit is contained in:
parent
6d657fed46
commit
3fec3aa21c
@ -67,30 +67,28 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
|
||||
return;
|
||||
}
|
||||
|
||||
withCallback(config.isTelemetry() ? getLatestTelemetry(ctx, entityId) : getAttributesAsync(ctx, entityId),
|
||||
withCallback(config.isTelemetry() ? getLatestTelemetry(ctx, entityId, msg) : getAttributesAsync(ctx, entityId, msg),
|
||||
attributes -> putAttributesAndTell(ctx, msg, attributes),
|
||||
t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
private ListenableFuture<List<KvEntry>> getAttributesAsync(TbContext ctx, EntityId entityId) {
|
||||
ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(ctx.getTenantId(), entityId, SERVER_SCOPE, config.getAttrMapping().keySet());
|
||||
private ListenableFuture<List<KvEntry>> getAttributesAsync(TbContext ctx, EntityId entityId, TbMsg msg) {
|
||||
List<String> attrProcess = TbNodeUtils.processPatterns(List.copyOf(config.getAttrMapping().keySet()), msg);
|
||||
ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(ctx.getTenantId(), entityId, SERVER_SCOPE, attrProcess);
|
||||
return Futures.transform(latest, l ->
|
||||
l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()), MoreExecutors.directExecutor());
|
||||
}
|
||||
|
||||
private ListenableFuture<List<KvEntry>> getLatestTelemetry(TbContext ctx, EntityId entityId) {
|
||||
ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(ctx.getTenantId(), entityId, config.getAttrMapping().keySet());
|
||||
private ListenableFuture<List<KvEntry>> getLatestTelemetry(TbContext ctx, EntityId entityId, TbMsg msg) {
|
||||
List<String> latestProcess = TbNodeUtils.processPatterns(List.copyOf(config.getAttrMapping().keySet()), msg);
|
||||
ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(ctx.getTenantId(), entityId, latestProcess);
|
||||
return Futures.transform(latest, l ->
|
||||
l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()), MoreExecutors.directExecutor());
|
||||
}
|
||||
|
||||
|
||||
private void putAttributesAndTell(TbContext ctx, TbMsg msg, List<? extends KvEntry> attributes) {
|
||||
log.info("attr " + attributes.toString());
|
||||
log.info("conf attr " + config.getAttrMapping().toString());
|
||||
List<String> attrProcessPattern = new ArrayList<>();
|
||||
log.info("msg {}", msg);
|
||||
log.info("result process {}", attrProcessPattern);
|
||||
Map<String, String> updConf = new HashMap<>();
|
||||
config.getAttrMapping().forEach((key, value) -> {
|
||||
String processPattern = TbNodeUtils.processPattern(key, msg);
|
||||
@ -99,16 +97,9 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
|
||||
});
|
||||
|
||||
attributes.forEach(r -> {
|
||||
log.info("r {}", r);
|
||||
log.info("rkey {}", r.getKey());
|
||||
log.info("index {}", attrProcessPattern.indexOf(r.getKey()));
|
||||
log.info("getByKey {}", updConf.get(r.getKey()));
|
||||
String attrName = updConf.get(r.getKey());
|
||||
log.info("attrName {}", attrName);
|
||||
msg.getMetaData().putValue(attrName, r.getValueAsString());
|
||||
log.info(msg.getMetaData().toString());
|
||||
});
|
||||
log.info(msg.toString());
|
||||
ctx.tellSuccess(msg);
|
||||
}
|
||||
|
||||
|
||||
@ -51,7 +51,6 @@ import org.thingsboard.server.dao.device.DeviceService;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
import org.thingsboard.server.dao.user.UserService;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -59,6 +58,7 @@ import java.util.Map;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyCollection;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.same;
|
||||
import static org.mockito.Mockito.verify;
|
||||
@ -69,11 +69,11 @@ import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class TbGetCustomerAttributeNodeTest {
|
||||
|
||||
private final RuleChainId ruleChainId = new RuleChainId(Uuids.timeBased());
|
||||
private final RuleNodeId ruleNodeId = new RuleNodeId(Uuids.timeBased());
|
||||
private TbGetCustomerAttributeNode node;
|
||||
|
||||
@Mock
|
||||
private TbContext ctx;
|
||||
|
||||
@Mock
|
||||
private AttributesService attributesService;
|
||||
@Mock
|
||||
@ -84,13 +84,9 @@ public class TbGetCustomerAttributeNodeTest {
|
||||
private AssetService assetService;
|
||||
@Mock
|
||||
private DeviceService deviceService;
|
||||
|
||||
private TbMsg msg;
|
||||
private Map metaData;
|
||||
|
||||
private final RuleChainId ruleChainId = new RuleChainId(Uuids.timeBased());
|
||||
private final RuleNodeId ruleNodeId = new RuleNodeId(Uuids.timeBased());
|
||||
|
||||
@Before
|
||||
public void init() throws TbNodeException {
|
||||
TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration();
|
||||
@ -121,7 +117,7 @@ public class TbGetCustomerAttributeNodeTest {
|
||||
when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user));
|
||||
|
||||
when(ctx.getAttributesService()).thenReturn(attributesService);
|
||||
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
|
||||
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), anyCollection()))
|
||||
.thenThrow(new IllegalStateException("something wrong"));
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
@ -146,7 +142,7 @@ public class TbGetCustomerAttributeNodeTest {
|
||||
when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user));
|
||||
|
||||
when(ctx.getAttributesService()).thenReturn(attributesService);
|
||||
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
|
||||
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), anyCollection()))
|
||||
.thenReturn(Futures.immediateFailedFuture(new IllegalStateException("something wrong")));
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
@ -256,7 +252,7 @@ public class TbGetCustomerAttributeNodeTest {
|
||||
List<TsKvEntry> timeseries = Lists.newArrayList(new BasicTsKvEntry(1L, new StringDataEntry("temperature", "highest")));
|
||||
|
||||
when(ctx.getTimeseriesService()).thenReturn(timeseriesService);
|
||||
when(timeseriesService.findLatest(any(), eq(customerId), eq(Collections.singleton("${word}"))))
|
||||
when(timeseriesService.findLatest(any(), eq(customerId), anyCollection()))
|
||||
.thenReturn(Futures.immediateFuture(timeseries));
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
@ -268,7 +264,7 @@ public class TbGetCustomerAttributeNodeTest {
|
||||
List<AttributeKvEntry> attributes = Lists.newArrayList(new BaseAttributeKvEntry(new StringDataEntry("temperature", "high"), 1L));
|
||||
|
||||
when(ctx.getAttributesService()).thenReturn(attributesService);
|
||||
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
|
||||
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), anyCollection()))
|
||||
.thenReturn(Futures.immediateFuture(attributes));
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
@ -53,7 +53,6 @@ import org.thingsboard.server.dao.relation.RelationService;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
import org.thingsboard.server.dao.user.UserService;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -61,6 +60,7 @@ import java.util.Map;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyCollection;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.same;
|
||||
import static org.mockito.Mockito.verify;
|
||||
@ -129,7 +129,7 @@ public class TbGetRelatedAttributeNodeTest {
|
||||
when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user));
|
||||
|
||||
when(ctx.getAttributesService()).thenReturn(attributesService);
|
||||
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
|
||||
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), anyCollection()))
|
||||
.thenThrow(new IllegalStateException("something wrong"));
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
@ -156,7 +156,7 @@ public class TbGetRelatedAttributeNodeTest {
|
||||
|
||||
when(ctx.getAttributesService()).thenReturn(attributesService);
|
||||
|
||||
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
|
||||
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), anyCollection()))
|
||||
.thenReturn(Futures.immediateFailedFuture(new IllegalStateException("something wrong")));
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
@ -277,7 +277,7 @@ public class TbGetRelatedAttributeNodeTest {
|
||||
List<TsKvEntry> timeseries = Lists.newArrayList(new BasicTsKvEntry(1L, new StringDataEntry("temperature", "highest")));
|
||||
|
||||
when(ctx.getTimeseriesService()).thenReturn(timeseriesService);
|
||||
when(timeseriesService.findLatest(any(), eq(customerId), eq(Collections.singleton("${word}"))))
|
||||
when(timeseriesService.findLatest(any(), eq(customerId), anyCollection()))
|
||||
.thenReturn(Futures.immediateFuture(timeseries));
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
@ -289,7 +289,7 @@ public class TbGetRelatedAttributeNodeTest {
|
||||
List<AttributeKvEntry> attributes = Lists.newArrayList(new BaseAttributeKvEntry(new StringDataEntry("temperature", "high"), 1L));
|
||||
|
||||
when(ctx.getAttributesService()).thenReturn(attributesService);
|
||||
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
|
||||
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), anyCollection()))
|
||||
.thenReturn(Futures.immediateFuture(attributes));
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
@ -52,7 +52,6 @@ import org.thingsboard.server.dao.device.DeviceService;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
import org.thingsboard.server.dao.user.UserService;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -60,6 +59,7 @@ import java.util.Map;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyCollection;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.same;
|
||||
import static org.mockito.Mockito.verify;
|
||||
@ -69,11 +69,11 @@ import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.Silent.class)
|
||||
public class TbGetTenantAttributeNodeTest {
|
||||
private final RuleChainId ruleChainId = new RuleChainId(Uuids.timeBased());
|
||||
private final RuleNodeId ruleNodeId = new RuleNodeId(Uuids.timeBased());
|
||||
private TbGetTenantAttributeNode node;
|
||||
|
||||
@Mock
|
||||
private TbContext ctx;
|
||||
|
||||
@Mock
|
||||
private AttributesService attributesService;
|
||||
@Mock
|
||||
@ -84,13 +84,9 @@ public class TbGetTenantAttributeNodeTest {
|
||||
private AssetService assetService;
|
||||
@Mock
|
||||
private DeviceService deviceService;
|
||||
|
||||
private TbMsg msg;
|
||||
private Map metaData;
|
||||
|
||||
private final RuleChainId ruleChainId = new RuleChainId(Uuids.timeBased());
|
||||
private final RuleNodeId ruleNodeId = new RuleNodeId(Uuids.timeBased());
|
||||
|
||||
@Before
|
||||
public void init() throws TbNodeException {
|
||||
TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration();
|
||||
@ -121,7 +117,7 @@ public class TbGetTenantAttributeNodeTest {
|
||||
when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user));
|
||||
|
||||
when(ctx.getAttributesService()).thenReturn(attributesService);
|
||||
when(attributesService.find(any(), eq(tenantId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
|
||||
when(attributesService.find(any(), eq(tenantId), eq(SERVER_SCOPE), anyCollection()))
|
||||
.thenThrow(new IllegalStateException("something wrong"));
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
@ -146,7 +142,7 @@ public class TbGetTenantAttributeNodeTest {
|
||||
when(userService.findUserByIdAsync(any(), eq(userId))).thenReturn(Futures.immediateFuture(user));
|
||||
|
||||
when(ctx.getAttributesService()).thenReturn(attributesService);
|
||||
when(attributesService.find(any(), eq(tenantId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
|
||||
when(attributesService.find(any(), eq(tenantId), eq(SERVER_SCOPE), anyCollection()))
|
||||
.thenReturn(Futures.immediateFailedFuture(new IllegalStateException("something wrong")));
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
@ -256,7 +252,7 @@ public class TbGetTenantAttributeNodeTest {
|
||||
List<TsKvEntry> timeseries = Lists.newArrayList(new BasicTsKvEntry(1L, new StringDataEntry("temperature", "highest")));
|
||||
|
||||
when(ctx.getTimeseriesService()).thenReturn(timeseriesService);
|
||||
when(timeseriesService.findLatest(any(), eq(tenantId), eq(Collections.singleton("${word}"))))
|
||||
when(timeseriesService.findLatest(any(), eq(tenantId), anyCollection()))
|
||||
.thenReturn(Futures.immediateFuture(timeseries));
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
@ -268,7 +264,7 @@ public class TbGetTenantAttributeNodeTest {
|
||||
List<AttributeKvEntry> attributes = Lists.newArrayList(new BaseAttributeKvEntry(new StringDataEntry("temperature", "high"), 1L));
|
||||
|
||||
when(ctx.getAttributesService()).thenReturn(attributesService);
|
||||
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), eq(Collections.singleton("${word}"))))
|
||||
when(attributesService.find(any(), eq(customerId), eq(SERVER_SCOPE), anyCollection()))
|
||||
.thenReturn(Futures.immediateFuture(attributes));
|
||||
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user