add Timeseries GetAttribute node

This commit is contained in:
vparomskiy 2018-03-19 19:33:04 +02:00
parent aec6b8c223
commit e68ee8b299
4 changed files with 65 additions and 5 deletions

View File

@ -25,6 +25,7 @@ import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.plugin.PluginService; import org.thingsboard.server.dao.plugin.PluginService;
import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.rule.RuleService; import org.thingsboard.server.dao.rule.RuleService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.user.UserService;
import java.util.UUID; import java.util.UUID;
@ -68,4 +69,6 @@ public interface TbContext {
RuleChainService getRuleChainService(); RuleChainService getRuleChainService();
TimeseriesService getTimeseriesService();
} }

View File

@ -15,14 +15,19 @@
*/ */
package org.thingsboard.rule.engine.metadata; package org.thingsboard.rule.engine.metadata;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.TbNodeUtils; import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.*; import org.thingsboard.rule.engine.api.*;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback; import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE; import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
@ -42,7 +47,7 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
withCallback( withCallback(
findEntityAsync(ctx, msg.getOriginator()), findEntityAsync(ctx, msg.getOriginator()),
entityId -> withCallback( entityId -> withCallback(
ctx.getAttributesService().find(entityId, SERVER_SCOPE, config.getAttrMapping().keySet()), config.isTelemetry() ? getLatestTelemetry(ctx, entityId) : getAttributesAsync(ctx, entityId),
attributes -> putAttributesAndTell(ctx, msg, attributes), attributes -> putAttributesAndTell(ctx, msg, attributes),
t -> ctx.tellError(msg, t) t -> ctx.tellError(msg, t)
), ),
@ -52,7 +57,20 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
} }
} }
private void putAttributesAndTell(TbContext ctx, TbMsg msg, List<AttributeKvEntry> attributes) { private ListenableFuture<List<KvEntry>> getAttributesAsync(TbContext ctx, EntityId entityId) {
ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, SERVER_SCOPE, config.getAttrMapping().keySet());
return Futures.transform(latest, (Function<? super List<AttributeKvEntry>, ? extends List<KvEntry>>) l ->
l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()));
}
private ListenableFuture<List<KvEntry>> getLatestTelemetry(TbContext ctx, EntityId entityId) {
ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(entityId, config.getAttrMapping().keySet());
return Futures.transform(latest, (Function<? super List<TsKvEntry>, ? extends List<KvEntry>>) l ->
l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()));
}
private void putAttributesAndTell(TbContext ctx, TbMsg msg, List<KvEntry> attributes) {
attributes.forEach(r -> { attributes.forEach(r -> {
String attrName = config.getAttrMapping().get(r.getKey()); String attrName = config.getAttrMapping().get(r.getKey());
msg.getMetaData().putValue(attrName, r.getValueAsString()); msg.getMetaData().putValue(attrName, r.getValueAsString());

View File

@ -24,4 +24,5 @@ import java.util.Optional;
public class TbGetEntityAttrNodeConfiguration { public class TbGetEntityAttrNodeConfiguration {
private Map<String, String> attrMapping; private Map<String, String> attrMapping;
private boolean isTelemetry = false;
} }

View File

@ -35,14 +35,13 @@ import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.user.UserService;
import java.util.Collections; import java.util.Collections;
@ -68,6 +67,8 @@ public class TbGetCustomerAttributeNodeTest {
@Mock @Mock
private AttributesService attributesService; private AttributesService attributesService;
@Mock @Mock
private TimeseriesService timeseriesService;
@Mock
private UserService userService; private UserService userService;
@Mock @Mock
private AssetService assetService; private AssetService assetService;
@ -82,6 +83,7 @@ public class TbGetCustomerAttributeNodeTest {
Map<String, String> attrMapping = new HashMap<>(); Map<String, String> attrMapping = new HashMap<>();
attrMapping.putIfAbsent("temperature", "tempo"); attrMapping.putIfAbsent("temperature", "tempo");
config.setAttrMapping(attrMapping); config.setAttrMapping(attrMapping);
config.setTelemetry(false);
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(); TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
nodeConfiguration.setData(mapper.valueToTree(config)); nodeConfiguration.setData(mapper.valueToTree(config));
@ -214,6 +216,42 @@ public class TbGetCustomerAttributeNodeTest {
entityAttributeFetched(customerId); entityAttributeFetched(customerId);
} }
@Test
public void deviceCustomerTelemetryFetched() throws TbNodeException {
TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration();
Map<String, String> attrMapping = new HashMap<>();
attrMapping.putIfAbsent("temperature", "tempo");
config.setAttrMapping(attrMapping);
config.setTelemetry(true);
ObjectMapper mapper = new ObjectMapper();
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
nodeConfiguration.setData(mapper.valueToTree(config));
node = new TbGetCustomerAttributeNode();
node.init(nodeConfiguration, null);
DeviceId deviceId = new DeviceId(UUIDs.timeBased());
CustomerId customerId = new CustomerId(UUIDs.timeBased());
Device device = new Device();
device.setCustomerId(customerId);
msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), new byte[4]);
when(ctx.getDeviceService()).thenReturn(deviceService);
when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device));
List<TsKvEntry> timeseries = Lists.newArrayList(new BasicTsKvEntry(1L, new StringDataEntry("temperature", "highest")));
when(ctx.getTimeseriesService()).thenReturn(timeseriesService);
when(timeseriesService.findLatest(customerId, Collections.singleton("temperature")))
.thenReturn(Futures.immediateFuture(timeseries));
node.onMsg(ctx, msg);
verify(ctx).tellNext(msg);
assertEquals(msg.getMetaData().getValue("tempo"), "highest");
}
private void entityAttributeFetched(CustomerId customerId) { private void entityAttributeFetched(CustomerId customerId) {
List<AttributeKvEntry> attributes = Lists.newArrayList(new BaseAttributeKvEntry(new StringDataEntry("temperature", "high"), 1L)); List<AttributeKvEntry> attributes = Lists.newArrayList(new BaseAttributeKvEntry(new StringDataEntry("temperature", "high"), 1L));