Merge pull request #7862 from volodymyr-babak/edge/push-latest-values

[3.5] Push latest timeseries key-value pair to edge on assignment entity to edge
This commit is contained in:
Andrew Shvayka 2022-12-30 15:58:48 +02:00 committed by GitHub
commit 9da5f3cee3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 183 additions and 129 deletions

View File

@ -51,7 +51,7 @@ import java.util.UUID;
public class AlarmEdgeProcessor extends BaseEdgeProcessor {
public ListenableFuture<Void> processAlarmFromEdge(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) {
log.trace("[{}] onAlarmUpdate [{}]", tenantId, alarmUpdateMsg);
log.trace("[{}] processAlarmFromEdge [{}]", tenantId, alarmUpdateMsg);
EntityId originatorId = getAlarmOriginator(tenantId, alarmUpdateMsg.getOriginatorName(),
EntityType.valueOf(alarmUpdateMsg.getOriginatorType()));
if (originatorId == null) {

View File

@ -82,7 +82,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
private static final ReentrantLock deviceCreationLock = new ReentrantLock();
public ListenableFuture<Void> processDeviceFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
log.trace("[{}] onDeviceUpdate [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName());
log.trace("[{}] processDeviceFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName());
switch (deviceUpdateMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE:
String deviceName = deviceUpdateMsg.getName();
@ -155,7 +155,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
}
public ListenableFuture<Void> processDeviceCredentialsFromEdge(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) {
log.debug("Executing onDeviceCredentialsUpdate, deviceCredentialsUpdateMsg [{}]", deviceCredentialsUpdateMsg);
log.debug("[{}] Executing processDeviceCredentialsFromEdge, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg);
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB()));
ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(tenantId, deviceId);
return Futures.transform(deviceFuture, device -> {
@ -201,9 +201,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
device.setCustomerId(getCustomerId(deviceUpdateMsg));
Optional<DeviceData> deviceDataOpt =
dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray());
if (deviceDataOpt.isPresent()) {
device.setDeviceData(deviceDataOpt.get());
}
deviceDataOpt.ifPresent(device::setDeviceData);
Device savedDevice = deviceService.saveDevice(device);
tbClusterService.onDeviceUpdated(savedDevice, device, false);
return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null);
@ -462,7 +460,6 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
}
private DownlinkMsg convertRpcCallEventToDownlink(EdgeEvent edgeEvent) {
log.trace("Executing convertRpcCallEventToDownlink, edgeEvent [{}]", edgeEvent);
return DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addDeviceRpcCallMsg(deviceMsgConstructor.constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody()))

View File

@ -58,7 +58,7 @@ import java.util.UUID;
public class RelationEdgeProcessor extends BaseEdgeProcessor {
public ListenableFuture<Void> processRelationFromEdge(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) {
log.trace("[{}] onRelationUpdate [{}]", tenantId, relationUpdateMsg);
log.trace("[{}] processRelationFromEdge [{}]", tenantId, relationUpdateMsg);
try {
EntityRelation entityRelation = new EntityRelation();

View File

@ -45,6 +45,7 @@ import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.id.WidgetsBundleId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
@ -52,13 +53,10 @@ import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.relation.RelationsSearchParameters;
import org.thingsboard.server.common.data.widget.WidgetType;
import org.thingsboard.server.common.data.widget.WidgetsBundle;
import org.thingsboard.server.dao.asset.AssetProfileService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.device.DeviceProfileService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.edge.EdgeEventService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg;
@ -92,25 +90,16 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
@Autowired
private AttributesService attributesService;
@Autowired
private TimeseriesService timeseriesService;
@Autowired
private RelationService relationService;
@Autowired
private DeviceService deviceService;
@Autowired
private AssetService assetService;
@Lazy
@Autowired
private TbEntityViewService entityViewService;
@Autowired
private DeviceProfileService deviceProfileService;
@Autowired
private AssetProfileService assetProfileService;
@Autowired
private WidgetsBundleService widgetsBundleService;
@ -141,77 +130,89 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
EntityId entityId = EntityIdFactory.getByTypeAndUuid(
EntityType.valueOf(attributesRequestMsg.getEntityType()),
new UUID(attributesRequestMsg.getEntityIdMSB(), attributesRequestMsg.getEntityIdLSB()));
final EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(entityId.getEntityType());
if (type == null) {
final EdgeEventType entityType = EdgeUtils.getEdgeEventTypeByEntityType(entityId.getEntityType());
if (entityType == null) {
log.warn("[{}] Type doesn't supported {}", tenantId, entityId.getEntityType());
return Futures.immediateFuture(null);
}
SettableFuture<Void> futureToSet = SettableFuture.create();
String scope = attributesRequestMsg.getScope();
ListenableFuture<List<AttributeKvEntry>> findAttrFuture = attributesService.findAll(tenantId, entityId, scope);
Futures.addCallback(findAttrFuture, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<AttributeKvEntry> ssAttributes) {
if (ssAttributes == null || ssAttributes.isEmpty()) {
log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId,
edge.getName(),
entityId.getEntityType(),
entityId.getId());
futureToSet.set(null);
return;
}
try {
Map<String, Object> entityData = new HashMap<>();
ObjectNode attributes = JacksonUtil.OBJECT_MAPPER.createObjectNode();
for (AttributeKvEntry attr : ssAttributes) {
if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(attr.getKey())
&& !DefaultDeviceStateService.INACTIVITY_TIMEOUT.equals(attr.getKey())) {
continue;
}
if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) {
attributes.put(attr.getKey(), attr.getBooleanValue().get());
} else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) {
attributes.put(attr.getKey(), attr.getDoubleValue().get());
} else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) {
attributes.put(attr.getKey(), attr.getLongValue().get());
} else {
attributes.put(attr.getKey(), attr.getValueAsString());
}
}
entityData.put("kv", attributes);
entityData.put("scope", scope);
JsonNode body = JacksonUtil.OBJECT_MAPPER.valueToTree(entityData);
log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, body);
ListenableFuture<Void> future = saveEdgeEvent(tenantId, edge.getId(), type, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void unused) {
futureToSet.set(null);
}
@Override
public void onFailure(Throwable throwable) {
String errMsg = String.format("[%s] Failed to save edge event [%s]", edge.getId(), attributesRequestMsg);
log.error(errMsg, throwable);
futureToSet.setException(new RuntimeException(errMsg, throwable));
}
}, dbCallbackExecutorService);
} catch (Exception e) {
String errMsg = String.format("[%s] Failed to save attribute updates to the edge [%s]", edge.getId(), attributesRequestMsg);
log.error(errMsg, e);
futureToSet.setException(new RuntimeException(errMsg, e));
}
}
@Override
public void onFailure(Throwable t) {
String errMsg = String.format("[%s] Can't find attributes [%s]", edge.getId(), attributesRequestMsg);
log.error(errMsg, t);
futureToSet.setException(new RuntimeException(errMsg, t));
return Futures.transformAsync(findAttrFuture, ssAttributes -> {
if (ssAttributes == null || ssAttributes.isEmpty()) {
log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId,
edge.getName(),
entityId.getEntityType(),
entityId.getId());
return Futures.immediateFuture(null);
}
return processEntityAttributesAndAddToEdgeQueue(tenantId, entityId, edge, entityType, scope, ssAttributes, attributesRequestMsg);
}, dbCallbackExecutorService);
}
private ListenableFuture<Void> processEntityAttributesAndAddToEdgeQueue(TenantId tenantId, EntityId entityId, Edge edge,
EdgeEventType entityType, String scope, List<AttributeKvEntry> ssAttributes,
AttributesRequestMsg attributesRequestMsg) {
try {
Map<String, Object> entityData = new HashMap<>();
ObjectNode attributes = JacksonUtil.OBJECT_MAPPER.createObjectNode();
for (AttributeKvEntry attr : ssAttributes) {
if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(attr.getKey())
&& !DefaultDeviceStateService.INACTIVITY_TIMEOUT.equals(attr.getKey())) {
continue;
}
if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) {
attributes.put(attr.getKey(), attr.getBooleanValue().get());
} else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) {
attributes.put(attr.getKey(), attr.getDoubleValue().get());
} else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) {
attributes.put(attr.getKey(), attr.getLongValue().get());
} else {
attributes.put(attr.getKey(), attr.getValueAsString());
}
}
ListenableFuture<Void> future;
if (attributes.size() > 0) {
entityData.put("kv", attributes);
entityData.put("scope", scope);
JsonNode body = JacksonUtil.OBJECT_MAPPER.valueToTree(entityData);
log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, body);
future = saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body);
} else {
future = Futures.immediateFuture(null);
}
return Futures.transformAsync(future, v -> processLatestTimeseriesAndAddToEdgeQueue(tenantId, entityId, edge, entityType), dbCallbackExecutorService);
} catch (Exception e) {
String errMsg = String.format("[%s] Failed to save attribute updates to the edge [%s]", edge.getId(), attributesRequestMsg);
log.error(errMsg, e);
return Futures.immediateFailedFuture(new RuntimeException(errMsg, e));
}
}
private ListenableFuture<Void> processLatestTimeseriesAndAddToEdgeQueue(TenantId tenantId, EntityId entityId, Edge edge,
EdgeEventType entityType) {
ListenableFuture<List<TsKvEntry>> getAllLatestFuture = timeseriesService.findAllLatest(tenantId, entityId);
return Futures.transformAsync(getAllLatestFuture, tsKvEntries -> {
if (tsKvEntries == null || tsKvEntries.isEmpty()) {
log.trace("[{}][{}] No timeseries found for entity {} [{}]", tenantId,
edge.getName(),
entityId.getEntityType(),
entityId.getId());
return Futures.immediateFuture(null);
}
List<ListenableFuture<Void>> futures = new ArrayList<>();
for (TsKvEntry tsKvEntry : tsKvEntries) {
if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(tsKvEntry.getKey())) {
continue;
}
ObjectNode entityBody = JacksonUtil.OBJECT_MAPPER.createObjectNode();
ObjectNode ts = JacksonUtil.OBJECT_MAPPER.createObjectNode();
ts.put(tsKvEntry.getKey(), tsKvEntry.getValueAsString());
entityBody.set("data", ts);
entityBody.put("ts", tsKvEntry.getTs());
futures.add(saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.TIMESERIES_UPDATED, entityId, JacksonUtil.valueToTree(entityBody)));
}
return Futures.transform(Futures.allAsList(futures), v -> null, dbCallbackExecutorService);
}, dbCallbackExecutorService);
return futureToSet;
}
@Override

View File

@ -18,7 +18,6 @@ package org.thingsboard.server.edge;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import org.junit.After;
@ -38,9 +37,6 @@ import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.asset.AssetProfile;
import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration;
import org.thingsboard.server.common.data.device.data.DeviceData;
import org.thingsboard.server.common.data.device.data.MqttDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.AlarmCondition;
import org.thingsboard.server.common.data.device.profile.AlarmConditionFilter;
import org.thingsboard.server.common.data.device.profile.AlarmConditionFilterKey;
@ -447,11 +443,6 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
}
protected Device saveDeviceOnCloudAndVerifyDeliveryToEdge() throws Exception {
// create ota package
edgeImitator.expectMessageAmount(1);
OtaPackageInfo firmwareOtaPackageInfo = saveOtaPackageInfo(thermostatDeviceProfile.getId());
Assert.assertTrue(edgeImitator.waitForMessages());
// create device and assign to edge
Device savedDevice = saveDevice(StringUtils.randomAlphanumeric(15), thermostatDeviceProfile.getName());
edgeImitator.expectMessageAmount(2); // device and device profile messages
@ -471,38 +462,6 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, deviceProfileUpdateMsg.getMsgType());
Assert.assertEquals(thermostatDeviceProfile.getUuidId().getMostSignificantBits(), deviceProfileUpdateMsg.getIdMSB());
Assert.assertEquals(thermostatDeviceProfile.getUuidId().getLeastSignificantBits(), deviceProfileUpdateMsg.getIdLSB());
// update device
edgeImitator.expectMessageAmount(1);
savedDevice.setFirmwareId(firmwareOtaPackageInfo.getId());
DeviceData deviceData = new DeviceData();
deviceData.setConfiguration(new DefaultDeviceConfiguration());
MqttDeviceTransportConfiguration transportConfiguration = new MqttDeviceTransportConfiguration();
transportConfiguration.getProperties().put("topic", "tb_rule_engine.thermostat");
deviceData.setTransportConfiguration(transportConfiguration);
savedDevice.setDeviceData(deviceData);
savedDevice = doPost("/api/device", savedDevice, Device.class);
Assert.assertTrue(edgeImitator.waitForMessages());
AbstractMessage latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof DeviceUpdateMsg);
deviceUpdateMsg = (DeviceUpdateMsg) latestMessage;
Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, deviceUpdateMsg.getMsgType());
Assert.assertEquals(savedDevice.getUuidId().getMostSignificantBits(), deviceUpdateMsg.getIdMSB());
Assert.assertEquals(savedDevice.getUuidId().getLeastSignificantBits(), deviceUpdateMsg.getIdLSB());
Assert.assertEquals(savedDevice.getName(), deviceUpdateMsg.getName());
Assert.assertEquals(savedDevice.getType(), deviceUpdateMsg.getType());
Assert.assertEquals(firmwareOtaPackageInfo.getUuidId().getMostSignificantBits(), deviceUpdateMsg.getFirmwareIdMSB());
Assert.assertEquals(firmwareOtaPackageInfo.getUuidId().getLeastSignificantBits(), deviceUpdateMsg.getFirmwareIdLSB());
Optional<DeviceData> deviceDataOpt =
dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray());
Assert.assertTrue(deviceDataOpt.isPresent());
deviceData = deviceDataOpt.get();
Assert.assertTrue(deviceData.getTransportConfiguration() instanceof MqttDeviceTransportConfiguration);
MqttDeviceTransportConfiguration mqttDeviceTransportConfiguration =
(MqttDeviceTransportConfiguration) deviceData.getTransportConfiguration();
Assert.assertEquals("tb_rule_engine.thermostat", mqttDeviceTransportConfiguration.getProperties().get("topic"));
return savedDevice;
}

View File

@ -31,8 +31,12 @@ import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.OtaPackageInfo;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration;
import org.thingsboard.server.common.data.device.data.DeviceData;
import org.thingsboard.server.common.data.device.data.MqttDeviceTransportConfiguration;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
@ -170,6 +174,8 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
// create device and assign to edge; update device
Device savedDevice = saveDeviceOnCloudAndVerifyDeliveryToEdge();
verifyUpdateFirmwareIdAndDeviceData(savedDevice);
// update device credentials - ACCESS_TOKEN
edgeImitator.expectMessageAmount(1);
DeviceCredentials deviceCredentials =
@ -204,6 +210,45 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
Assert.assertEquals(deviceCredentials.getCredentialsValue(), deviceCredentialsUpdateMsg.getCredentialsValue());
}
private void verifyUpdateFirmwareIdAndDeviceData(Device savedDevice) throws InterruptedException {
// create ota package
edgeImitator.expectMessageAmount(1);
OtaPackageInfo firmwareOtaPackageInfo = saveOtaPackageInfo(thermostatDeviceProfile.getId());
Assert.assertTrue(edgeImitator.waitForMessages());
// update device
edgeImitator.expectMessageAmount(1);
savedDevice.setFirmwareId(firmwareOtaPackageInfo.getId());
DeviceData deviceData = new DeviceData();
deviceData.setConfiguration(new DefaultDeviceConfiguration());
MqttDeviceTransportConfiguration transportConfiguration = new MqttDeviceTransportConfiguration();
transportConfiguration.getProperties().put("topic", "tb_rule_engine.thermostat");
deviceData.setTransportConfiguration(transportConfiguration);
savedDevice.setDeviceData(deviceData);
savedDevice = doPost("/api/device", savedDevice, Device.class);
Assert.assertTrue(edgeImitator.waitForMessages());
AbstractMessage latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof DeviceUpdateMsg);
DeviceUpdateMsg deviceUpdateMsg = (DeviceUpdateMsg) latestMessage;
Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, deviceUpdateMsg.getMsgType());
Assert.assertEquals(savedDevice.getUuidId().getMostSignificantBits(), deviceUpdateMsg.getIdMSB());
Assert.assertEquals(savedDevice.getUuidId().getLeastSignificantBits(), deviceUpdateMsg.getIdLSB());
Assert.assertEquals(savedDevice.getName(), deviceUpdateMsg.getName());
Assert.assertEquals(savedDevice.getType(), deviceUpdateMsg.getType());
Assert.assertEquals(firmwareOtaPackageInfo.getUuidId().getMostSignificantBits(), deviceUpdateMsg.getFirmwareIdMSB());
Assert.assertEquals(firmwareOtaPackageInfo.getUuidId().getLeastSignificantBits(), deviceUpdateMsg.getFirmwareIdLSB());
Optional<DeviceData> deviceDataOpt =
dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray());
Assert.assertTrue(deviceDataOpt.isPresent());
deviceData = deviceDataOpt.get();
Assert.assertTrue(deviceData.getTransportConfiguration() instanceof MqttDeviceTransportConfiguration);
MqttDeviceTransportConfiguration mqttDeviceTransportConfiguration =
(MqttDeviceTransportConfiguration) deviceData.getTransportConfiguration();
Assert.assertEquals("tb_rule_engine.thermostat", mqttDeviceTransportConfiguration.getProperties().get("topic"));
}
@Test
public void testDeviceReachedMaximumAllowedOnCloud() throws Exception {
// update tenant profile configuration
@ -323,6 +368,9 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
"inactivityTimeout", "3600000");
sendAttributesRequestAndVerify(device, DataConstants.SHARED_SCOPE, "{\"key2\":\"value2\"}",
"key2", "value2");
doDelete("/api/plugins/telemetry/DEVICE/" + device.getUuidId() + "/" + DataConstants.SERVER_SCOPE, "keys","key1, inactivityTimeout");
doDelete("/api/plugins/telemetry/DEVICE/" + device.getUuidId() + "/" + DataConstants.SHARED_SCOPE, "keys", "key2");
}
@Test
@ -640,4 +688,53 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
client.disconnect();
}
@Test
public void testVerifyDeliveryOfLatestTimeseriesOnAttributesRequest() throws Exception {
Device device = findDeviceByName("Edge Device 1");
JsonNode timeseriesData = mapper.readTree("{\"temperature\":25}");
doPost("/api/plugins/telemetry/DEVICE/" + device.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE,
timeseriesData);
// Wait before device timeseries saved to database before requesting them from edge
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> {
String urlTemplate = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/keys/timeseries";
List<String> actualKeys = doGetAsyncTyped(urlTemplate, new TypeReference<>() {});
return actualKeys != null && !actualKeys.isEmpty() && actualKeys.contains("temperature");
});
UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();
AttributesRequestMsg.Builder attributesRequestMsgBuilder = AttributesRequestMsg.newBuilder();
attributesRequestMsgBuilder.setEntityIdMSB(device.getUuidId().getMostSignificantBits());
attributesRequestMsgBuilder.setEntityIdLSB(device.getUuidId().getLeastSignificantBits());
attributesRequestMsgBuilder.setEntityType(EntityType.DEVICE.name());
attributesRequestMsgBuilder.setScope(DataConstants.SERVER_SCOPE);
uplinkMsgBuilder.addAttributesRequestMsg(attributesRequestMsgBuilder.build());
edgeImitator.expectResponsesAmount(1);
edgeImitator.expectMessageAmount(1);
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
Assert.assertTrue(edgeImitator.waitForResponses());
Assert.assertTrue(edgeImitator.waitForMessages());
AbstractMessage latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof EntityDataProto);
EntityDataProto latestEntityDataMsg = (EntityDataProto) latestMessage;
Assert.assertEquals(device.getUuidId().getMostSignificantBits(), latestEntityDataMsg.getEntityIdMSB());
Assert.assertEquals(device.getUuidId().getLeastSignificantBits(), latestEntityDataMsg.getEntityIdLSB());
Assert.assertEquals(device.getId().getEntityType().name(), latestEntityDataMsg.getEntityType());
Assert.assertTrue(latestEntityDataMsg.hasPostTelemetryMsg());
TransportProtos.PostTelemetryMsg timeseriesUpdatedMsg = latestEntityDataMsg.getPostTelemetryMsg();
Assert.assertEquals(1, timeseriesUpdatedMsg.getTsKvListList().size());
TransportProtos.TsKvListProto tsKvListProto = timeseriesUpdatedMsg.getTsKvListList().get(0);
Assert.assertEquals(1, tsKvListProto.getKvList().size());
TransportProtos.KeyValueProto keyValueProto = tsKvListProto.getKvList().get(0);
Assert.assertEquals(25, keyValueProto.getLongV());
Assert.assertEquals("temperature", keyValueProto.getKey());
}
}