Added functionality to push latest timeseries values to edge on assing entities to edge

This commit is contained in:
Volodymyr Babak 2022-12-27 16:53:26 +02:00
parent bcdda93de3
commit 031706e1cf
2 changed files with 133 additions and 80 deletions

View File

@ -45,6 +45,7 @@ import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.id.WidgetsBundleId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
@ -52,13 +53,10 @@ import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.relation.RelationsSearchParameters;
import org.thingsboard.server.common.data.widget.WidgetType;
import org.thingsboard.server.common.data.widget.WidgetsBundle;
import org.thingsboard.server.dao.asset.AssetProfileService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.device.DeviceProfileService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.edge.EdgeEventService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg;
@ -92,25 +90,16 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
@Autowired
private AttributesService attributesService;
@Autowired
private TimeseriesService timeseriesService;
@Autowired
private RelationService relationService;
@Autowired
private DeviceService deviceService;
@Autowired
private AssetService assetService;
@Lazy
@Autowired
private TbEntityViewService entityViewService;
@Autowired
private DeviceProfileService deviceProfileService;
@Autowired
private AssetProfileService assetProfileService;
@Autowired
private WidgetsBundleService widgetsBundleService;
@ -141,26 +130,28 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
EntityId entityId = EntityIdFactory.getByTypeAndUuid(
EntityType.valueOf(attributesRequestMsg.getEntityType()),
new UUID(attributesRequestMsg.getEntityIdMSB(), attributesRequestMsg.getEntityIdLSB()));
final EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(entityId.getEntityType());
if (type == null) {
final EdgeEventType entityType = EdgeUtils.getEdgeEventTypeByEntityType(entityId.getEntityType());
if (entityType == null) {
log.warn("[{}] Type doesn't supported {}", tenantId, entityId.getEntityType());
return Futures.immediateFuture(null);
}
SettableFuture<Void> futureToSet = SettableFuture.create();
String scope = attributesRequestMsg.getScope();
ListenableFuture<List<AttributeKvEntry>> findAttrFuture = attributesService.findAll(tenantId, entityId, scope);
Futures.addCallback(findAttrFuture, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<AttributeKvEntry> ssAttributes) {
return Futures.transformAsync(findAttrFuture, ssAttributes -> {
if (ssAttributes == null || ssAttributes.isEmpty()) {
log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId,
edge.getName(),
entityId.getEntityType(),
entityId.getId());
futureToSet.set(null);
return;
return Futures.immediateFuture(null);
}
return processEntityAttributesAndAddToEdgeQueue(tenantId, entityId, edge, entityType, scope, ssAttributes, attributesRequestMsg);
}, dbCallbackExecutorService);
}
private ListenableFuture<Void> processEntityAttributesAndAddToEdgeQueue(TenantId tenantId, EntityId entityId, Edge edge,
EdgeEventType entityType, String scope, List<AttributeKvEntry> ssAttributes,
AttributesRequestMsg attributesRequestMsg) {
try {
Map<String, Object> entityData = new HashMap<>();
ObjectNode attributes = JacksonUtil.OBJECT_MAPPER.createObjectNode();
@ -179,39 +170,49 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
attributes.put(attr.getKey(), attr.getValueAsString());
}
}
ListenableFuture<Void> future;
if (attributes.size() > 0) {
entityData.put("kv", attributes);
entityData.put("scope", scope);
JsonNode body = JacksonUtil.OBJECT_MAPPER.valueToTree(entityData);
log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, body);
ListenableFuture<Void> future = saveEdgeEvent(tenantId, edge.getId(), type, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void unused) {
futureToSet.set(null);
future = saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body);
} else {
future = Futures.immediateFuture(null);
}
@Override
public void onFailure(Throwable throwable) {
String errMsg = String.format("[%s] Failed to save edge event [%s]", edge.getId(), attributesRequestMsg);
log.error(errMsg, throwable);
futureToSet.setException(new RuntimeException(errMsg, throwable));
}
}, dbCallbackExecutorService);
return Futures.transformAsync(future, v -> processLatestTimeseriesAndAddToEdgeQueue(tenantId, entityId, edge, entityType), dbCallbackExecutorService);
} catch (Exception e) {
String errMsg = String.format("[%s] Failed to save attribute updates to the edge [%s]", edge.getId(), attributesRequestMsg);
log.error(errMsg, e);
futureToSet.setException(new RuntimeException(errMsg, e));
return Futures.immediateFailedFuture(new RuntimeException(errMsg, e));
}
}
@Override
public void onFailure(Throwable t) {
String errMsg = String.format("[%s] Can't find attributes [%s]", edge.getId(), attributesRequestMsg);
log.error(errMsg, t);
futureToSet.setException(new RuntimeException(errMsg, t));
private ListenableFuture<Void> processLatestTimeseriesAndAddToEdgeQueue(TenantId tenantId, EntityId entityId, Edge edge,
EdgeEventType entityType) {
ListenableFuture<List<TsKvEntry>> getAllLatestFuture = timeseriesService.findAllLatest(tenantId, entityId);
return Futures.transformAsync(getAllLatestFuture, tsKvEntries -> {
if (tsKvEntries == null || tsKvEntries.isEmpty()) {
log.trace("[{}][{}] No timeseries found for entity {} [{}]", tenantId,
edge.getName(),
entityId.getEntityType(),
entityId.getId());
return Futures.immediateFuture(null);
}
List<ListenableFuture<Void>> futures = new ArrayList<>();
for (TsKvEntry tsKvEntry : tsKvEntries) {
if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(tsKvEntry.getKey())) {
continue;
}
ObjectNode entityBody = JacksonUtil.OBJECT_MAPPER.createObjectNode();
ObjectNode ts = JacksonUtil.OBJECT_MAPPER.createObjectNode();
ts.put(tsKvEntry.getKey(), tsKvEntry.getValueAsString());
entityBody.set("data", ts);
entityBody.put("ts", tsKvEntry.getTs());
futures.add(saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.TIMESERIES_UPDATED, entityId, JacksonUtil.valueToTree(entityBody)));
}
return Futures.transform(Futures.allAsList(futures), v -> null, dbCallbackExecutorService);
}, dbCallbackExecutorService);
return futureToSet;
}
@Override

View File

@ -323,6 +323,9 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
"inactivityTimeout", "3600000");
sendAttributesRequestAndVerify(device, DataConstants.SHARED_SCOPE, "{\"key2\":\"value2\"}",
"key2", "value2");
doDelete("/api/plugins/telemetry/DEVICE/" + device.getUuidId() + "/" + DataConstants.SERVER_SCOPE, "keys","key1, inactivityTimeout");
doDelete("/api/plugins/telemetry/DEVICE/" + device.getUuidId() + "/" + DataConstants.SHARED_SCOPE, "keys", "key2");
}
@Test
@ -640,4 +643,53 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
client.disconnect();
}
@Test
public void testVerifyDeliveryOfLatestTimeseriesOnAttributesRequest() throws Exception {
Device device = findDeviceByName("Edge Device 1");
JsonNode timeseriesData = mapper.readTree("{\"temperature\":25}");
doPost("/api/plugins/telemetry/DEVICE/" + device.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE,
timeseriesData);
// Wait before device timeseries saved to database before requesting them from edge
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> {
String urlTemplate = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/keys/timeseries";
List<String> actualKeys = doGetAsyncTyped(urlTemplate, new TypeReference<>() {});
return actualKeys != null && !actualKeys.isEmpty() && actualKeys.contains("temperature");
});
UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();
AttributesRequestMsg.Builder attributesRequestMsgBuilder = AttributesRequestMsg.newBuilder();
attributesRequestMsgBuilder.setEntityIdMSB(device.getUuidId().getMostSignificantBits());
attributesRequestMsgBuilder.setEntityIdLSB(device.getUuidId().getLeastSignificantBits());
attributesRequestMsgBuilder.setEntityType(EntityType.DEVICE.name());
attributesRequestMsgBuilder.setScope(DataConstants.SERVER_SCOPE);
uplinkMsgBuilder.addAttributesRequestMsg(attributesRequestMsgBuilder.build());
edgeImitator.expectResponsesAmount(1);
edgeImitator.expectMessageAmount(1);
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
Assert.assertTrue(edgeImitator.waitForResponses());
Assert.assertTrue(edgeImitator.waitForMessages());
AbstractMessage latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof EntityDataProto);
EntityDataProto latestEntityDataMsg = (EntityDataProto) latestMessage;
Assert.assertEquals(device.getUuidId().getMostSignificantBits(), latestEntityDataMsg.getEntityIdMSB());
Assert.assertEquals(device.getUuidId().getLeastSignificantBits(), latestEntityDataMsg.getEntityIdLSB());
Assert.assertEquals(device.getId().getEntityType().name(), latestEntityDataMsg.getEntityType());
Assert.assertTrue(latestEntityDataMsg.hasPostTelemetryMsg());
TransportProtos.PostTelemetryMsg timeseriesUpdatedMsg = latestEntityDataMsg.getPostTelemetryMsg();
Assert.assertEquals(1, timeseriesUpdatedMsg.getTsKvListList().size());
TransportProtos.TsKvListProto tsKvListProto = timeseriesUpdatedMsg.getTsKvListList().get(0);
Assert.assertEquals(1, tsKvListProto.getKvList().size());
TransportProtos.KeyValueProto keyValueProto = tsKvListProto.getKvList().get(0);
Assert.assertEquals(25, keyValueProto.getLongV());
Assert.assertEquals("temperature", keyValueProto.getKey());
}
}