From bcdda93de3d7b1568251aa571dc9392677f28ed9 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 27 Dec 2022 16:52:51 +0200 Subject: [PATCH 1/4] Improve logging of edge services and processors --- .../service/edge/rpc/processor/AlarmEdgeProcessor.java | 2 +- .../service/edge/rpc/processor/DeviceEdgeProcessor.java | 9 +++------ .../edge/rpc/processor/RelationEdgeProcessor.java | 2 +- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AlarmEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AlarmEdgeProcessor.java index b5d256bfa8..5f4395fb47 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AlarmEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AlarmEdgeProcessor.java @@ -51,7 +51,7 @@ import java.util.UUID; public class AlarmEdgeProcessor extends BaseEdgeProcessor { public ListenableFuture processAlarmFromEdge(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) { - log.trace("[{}] onAlarmUpdate [{}]", tenantId, alarmUpdateMsg); + log.trace("[{}] processAlarmFromEdge [{}]", tenantId, alarmUpdateMsg); EntityId originatorId = getAlarmOriginator(tenantId, alarmUpdateMsg.getOriginatorName(), EntityType.valueOf(alarmUpdateMsg.getOriginatorType())); if (originatorId == null) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java index 81e7970611..02aca5165c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java @@ -82,7 +82,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { private static final ReentrantLock deviceCreationLock = new ReentrantLock(); public ListenableFuture processDeviceFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) { - log.trace("[{}] onDeviceUpdate [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); + log.trace("[{}] processDeviceFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); switch (deviceUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: String deviceName = deviceUpdateMsg.getName(); @@ -155,7 +155,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { } public ListenableFuture processDeviceCredentialsFromEdge(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) { - log.debug("Executing onDeviceCredentialsUpdate, deviceCredentialsUpdateMsg [{}]", deviceCredentialsUpdateMsg); + log.debug("[{}] Executing processDeviceCredentialsFromEdge, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg); DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB())); ListenableFuture deviceFuture = deviceService.findDeviceByIdAsync(tenantId, deviceId); return Futures.transform(deviceFuture, device -> { @@ -201,9 +201,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { device.setCustomerId(getCustomerId(deviceUpdateMsg)); Optional deviceDataOpt = dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray()); - if (deviceDataOpt.isPresent()) { - device.setDeviceData(deviceDataOpt.get()); - } + deviceDataOpt.ifPresent(device::setDeviceData); Device savedDevice = deviceService.saveDevice(device); tbClusterService.onDeviceUpdated(savedDevice, device, false); return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null); @@ -462,7 +460,6 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { } private DownlinkMsg convertRpcCallEventToDownlink(EdgeEvent edgeEvent) { - log.trace("Executing convertRpcCallEventToDownlink, edgeEvent [{}]", edgeEvent); return DownlinkMsg.newBuilder() .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) .addDeviceRpcCallMsg(deviceMsgConstructor.constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody())) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationEdgeProcessor.java index 2998001a0a..8b07d771d6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationEdgeProcessor.java @@ -58,7 +58,7 @@ import java.util.UUID; public class RelationEdgeProcessor extends BaseEdgeProcessor { public ListenableFuture processRelationFromEdge(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) { - log.trace("[{}] onRelationUpdate [{}]", tenantId, relationUpdateMsg); + log.trace("[{}] processRelationFromEdge [{}]", tenantId, relationUpdateMsg); try { EntityRelation entityRelation = new EntityRelation(); From 031706e1cf4033ad33efeed417eb412a01f46d6b Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 27 Dec 2022 16:53:26 +0200 Subject: [PATCH 2/4] Added functionality to push latest timeseries values to edge on assing entities to edge --- .../rpc/sync/DefaultEdgeRequestsService.java | 161 +++++++++--------- .../server/edge/BaseDeviceEdgeTest.java | 52 ++++++ 2 files changed, 133 insertions(+), 80 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index 4be7ddb1c8..2f72d96770 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -45,6 +45,7 @@ import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.id.WidgetsBundleId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.DataType; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntityRelationsQuery; import org.thingsboard.server.common.data.relation.EntitySearchDirection; @@ -52,13 +53,10 @@ import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.relation.RelationsSearchParameters; import org.thingsboard.server.common.data.widget.WidgetType; import org.thingsboard.server.common.data.widget.WidgetsBundle; -import org.thingsboard.server.dao.asset.AssetProfileService; -import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.attributes.AttributesService; -import org.thingsboard.server.dao.device.DeviceProfileService; -import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.edge.EdgeEventService; import org.thingsboard.server.dao.relation.RelationService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg; @@ -92,25 +90,16 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { @Autowired private AttributesService attributesService; + @Autowired + private TimeseriesService timeseriesService; + @Autowired private RelationService relationService; - @Autowired - private DeviceService deviceService; - - @Autowired - private AssetService assetService; - @Lazy @Autowired private TbEntityViewService entityViewService; - @Autowired - private DeviceProfileService deviceProfileService; - - @Autowired - private AssetProfileService assetProfileService; - @Autowired private WidgetsBundleService widgetsBundleService; @@ -141,77 +130,89 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { EntityId entityId = EntityIdFactory.getByTypeAndUuid( EntityType.valueOf(attributesRequestMsg.getEntityType()), new UUID(attributesRequestMsg.getEntityIdMSB(), attributesRequestMsg.getEntityIdLSB())); - final EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(entityId.getEntityType()); - if (type == null) { + final EdgeEventType entityType = EdgeUtils.getEdgeEventTypeByEntityType(entityId.getEntityType()); + if (entityType == null) { log.warn("[{}] Type doesn't supported {}", tenantId, entityId.getEntityType()); return Futures.immediateFuture(null); } - SettableFuture futureToSet = SettableFuture.create(); String scope = attributesRequestMsg.getScope(); ListenableFuture> findAttrFuture = attributesService.findAll(tenantId, entityId, scope); - Futures.addCallback(findAttrFuture, new FutureCallback<>() { - @Override - public void onSuccess(@Nullable List ssAttributes) { - if (ssAttributes == null || ssAttributes.isEmpty()) { - log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId, - edge.getName(), - entityId.getEntityType(), - entityId.getId()); - futureToSet.set(null); - return; - } - - try { - Map entityData = new HashMap<>(); - ObjectNode attributes = JacksonUtil.OBJECT_MAPPER.createObjectNode(); - for (AttributeKvEntry attr : ssAttributes) { - if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(attr.getKey()) - && !DefaultDeviceStateService.INACTIVITY_TIMEOUT.equals(attr.getKey())) { - continue; - } - if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) { - attributes.put(attr.getKey(), attr.getBooleanValue().get()); - } else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) { - attributes.put(attr.getKey(), attr.getDoubleValue().get()); - } else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) { - attributes.put(attr.getKey(), attr.getLongValue().get()); - } else { - attributes.put(attr.getKey(), attr.getValueAsString()); - } - } - entityData.put("kv", attributes); - entityData.put("scope", scope); - JsonNode body = JacksonUtil.OBJECT_MAPPER.valueToTree(entityData); - log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, body); - ListenableFuture future = saveEdgeEvent(tenantId, edge.getId(), type, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body); - Futures.addCallback(future, new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Void unused) { - futureToSet.set(null); - } - - @Override - public void onFailure(Throwable throwable) { - String errMsg = String.format("[%s] Failed to save edge event [%s]", edge.getId(), attributesRequestMsg); - log.error(errMsg, throwable); - futureToSet.setException(new RuntimeException(errMsg, throwable)); - } - }, dbCallbackExecutorService); - } catch (Exception e) { - String errMsg = String.format("[%s] Failed to save attribute updates to the edge [%s]", edge.getId(), attributesRequestMsg); - log.error(errMsg, e); - futureToSet.setException(new RuntimeException(errMsg, e)); - } - } - - @Override - public void onFailure(Throwable t) { - String errMsg = String.format("[%s] Can't find attributes [%s]", edge.getId(), attributesRequestMsg); - log.error(errMsg, t); - futureToSet.setException(new RuntimeException(errMsg, t)); + return Futures.transformAsync(findAttrFuture, ssAttributes -> { + if (ssAttributes == null || ssAttributes.isEmpty()) { + log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId, + edge.getName(), + entityId.getEntityType(), + entityId.getId()); + return Futures.immediateFuture(null); } + return processEntityAttributesAndAddToEdgeQueue(tenantId, entityId, edge, entityType, scope, ssAttributes, attributesRequestMsg); + }, dbCallbackExecutorService); + } + + private ListenableFuture processEntityAttributesAndAddToEdgeQueue(TenantId tenantId, EntityId entityId, Edge edge, + EdgeEventType entityType, String scope, List ssAttributes, + AttributesRequestMsg attributesRequestMsg) { + try { + Map entityData = new HashMap<>(); + ObjectNode attributes = JacksonUtil.OBJECT_MAPPER.createObjectNode(); + for (AttributeKvEntry attr : ssAttributes) { + if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(attr.getKey()) + && !DefaultDeviceStateService.INACTIVITY_TIMEOUT.equals(attr.getKey())) { + continue; + } + if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) { + attributes.put(attr.getKey(), attr.getBooleanValue().get()); + } else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) { + attributes.put(attr.getKey(), attr.getDoubleValue().get()); + } else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) { + attributes.put(attr.getKey(), attr.getLongValue().get()); + } else { + attributes.put(attr.getKey(), attr.getValueAsString()); + } + } + ListenableFuture future; + if (attributes.size() > 0) { + entityData.put("kv", attributes); + entityData.put("scope", scope); + JsonNode body = JacksonUtil.OBJECT_MAPPER.valueToTree(entityData); + log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, body); + future = saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body); + } else { + future = Futures.immediateFuture(null); + } + return Futures.transformAsync(future, v -> processLatestTimeseriesAndAddToEdgeQueue(tenantId, entityId, edge, entityType), dbCallbackExecutorService); + } catch (Exception e) { + String errMsg = String.format("[%s] Failed to save attribute updates to the edge [%s]", edge.getId(), attributesRequestMsg); + log.error(errMsg, e); + return Futures.immediateFailedFuture(new RuntimeException(errMsg, e)); + } + } + + private ListenableFuture processLatestTimeseriesAndAddToEdgeQueue(TenantId tenantId, EntityId entityId, Edge edge, + EdgeEventType entityType) { + ListenableFuture> getAllLatestFuture = timeseriesService.findAllLatest(tenantId, entityId); + return Futures.transformAsync(getAllLatestFuture, tsKvEntries -> { + if (tsKvEntries == null || tsKvEntries.isEmpty()) { + log.trace("[{}][{}] No timeseries found for entity {} [{}]", tenantId, + edge.getName(), + entityId.getEntityType(), + entityId.getId()); + return Futures.immediateFuture(null); + } + List> futures = new ArrayList<>(); + for (TsKvEntry tsKvEntry : tsKvEntries) { + if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(tsKvEntry.getKey())) { + continue; + } + ObjectNode entityBody = JacksonUtil.OBJECT_MAPPER.createObjectNode(); + ObjectNode ts = JacksonUtil.OBJECT_MAPPER.createObjectNode(); + ts.put(tsKvEntry.getKey(), tsKvEntry.getValueAsString()); + entityBody.set("data", ts); + entityBody.put("ts", tsKvEntry.getTs()); + futures.add(saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.TIMESERIES_UPDATED, entityId, JacksonUtil.valueToTree(entityBody))); + } + return Futures.transform(Futures.allAsList(futures), v -> null, dbCallbackExecutorService); }, dbCallbackExecutorService); - return futureToSet; } @Override diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java index 649bf26210..1b19f481f2 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java @@ -323,6 +323,9 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest { "inactivityTimeout", "3600000"); sendAttributesRequestAndVerify(device, DataConstants.SHARED_SCOPE, "{\"key2\":\"value2\"}", "key2", "value2"); + + doDelete("/api/plugins/telemetry/DEVICE/" + device.getUuidId() + "/" + DataConstants.SERVER_SCOPE, "keys","key1, inactivityTimeout"); + doDelete("/api/plugins/telemetry/DEVICE/" + device.getUuidId() + "/" + DataConstants.SHARED_SCOPE, "keys", "key2"); } @Test @@ -640,4 +643,53 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest { client.disconnect(); } + + @Test + public void testVerifyDeliveryOfLatestTimeseriesOnAttributesRequest() throws Exception { + Device device = findDeviceByName("Edge Device 1"); + + JsonNode timeseriesData = mapper.readTree("{\"temperature\":25}"); + + doPost("/api/plugins/telemetry/DEVICE/" + device.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE, + timeseriesData); + + // Wait before device timeseries saved to database before requesting them from edge + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .until(() -> { + String urlTemplate = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/keys/timeseries"; + List actualKeys = doGetAsyncTyped(urlTemplate, new TypeReference<>() {}); + return actualKeys != null && !actualKeys.isEmpty() && actualKeys.contains("temperature"); + }); + + UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); + AttributesRequestMsg.Builder attributesRequestMsgBuilder = AttributesRequestMsg.newBuilder(); + attributesRequestMsgBuilder.setEntityIdMSB(device.getUuidId().getMostSignificantBits()); + attributesRequestMsgBuilder.setEntityIdLSB(device.getUuidId().getLeastSignificantBits()); + attributesRequestMsgBuilder.setEntityType(EntityType.DEVICE.name()); + attributesRequestMsgBuilder.setScope(DataConstants.SERVER_SCOPE); + uplinkMsgBuilder.addAttributesRequestMsg(attributesRequestMsgBuilder.build()); + + edgeImitator.expectResponsesAmount(1); + edgeImitator.expectMessageAmount(1); + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); + Assert.assertTrue(edgeImitator.waitForResponses()); + Assert.assertTrue(edgeImitator.waitForMessages()); + + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof EntityDataProto); + EntityDataProto latestEntityDataMsg = (EntityDataProto) latestMessage; + Assert.assertEquals(device.getUuidId().getMostSignificantBits(), latestEntityDataMsg.getEntityIdMSB()); + Assert.assertEquals(device.getUuidId().getLeastSignificantBits(), latestEntityDataMsg.getEntityIdLSB()); + Assert.assertEquals(device.getId().getEntityType().name(), latestEntityDataMsg.getEntityType()); + Assert.assertTrue(latestEntityDataMsg.hasPostTelemetryMsg()); + + TransportProtos.PostTelemetryMsg timeseriesUpdatedMsg = latestEntityDataMsg.getPostTelemetryMsg(); + Assert.assertEquals(1, timeseriesUpdatedMsg.getTsKvListList().size()); + TransportProtos.TsKvListProto tsKvListProto = timeseriesUpdatedMsg.getTsKvListList().get(0); + Assert.assertEquals(1, tsKvListProto.getKvList().size()); + TransportProtos.KeyValueProto keyValueProto = tsKvListProto.getKvList().get(0); + Assert.assertEquals(25, keyValueProto.getLongV()); + Assert.assertEquals("temperature", keyValueProto.getKey()); + } } From 99a3e9769fa7f8440b773e9932e3d9ff7dc4b8e6 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 28 Dec 2022 16:02:56 +0200 Subject: [PATCH 3/4] Edge test - speed up test by moving firmware and device data check into separate method --- .../server/edge/AbstractEdgeTest.java | 41 ----------------- .../server/edge/BaseDeviceEdgeTest.java | 44 +++++++++++++++++++ 2 files changed, 44 insertions(+), 41 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java index 02406da08a..5314d4872f 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java @@ -18,7 +18,6 @@ package org.thingsboard.server.edge; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; -import com.google.protobuf.AbstractMessage; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; import org.junit.After; @@ -38,9 +37,6 @@ import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.alarm.AlarmSeverity; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.asset.AssetProfile; -import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration; -import org.thingsboard.server.common.data.device.data.DeviceData; -import org.thingsboard.server.common.data.device.data.MqttDeviceTransportConfiguration; import org.thingsboard.server.common.data.device.profile.AlarmCondition; import org.thingsboard.server.common.data.device.profile.AlarmConditionFilter; import org.thingsboard.server.common.data.device.profile.AlarmConditionFilterKey; @@ -447,11 +443,6 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { } protected Device saveDeviceOnCloudAndVerifyDeliveryToEdge() throws Exception { - // create ota package - edgeImitator.expectMessageAmount(1); - OtaPackageInfo firmwareOtaPackageInfo = saveOtaPackageInfo(thermostatDeviceProfile.getId()); - Assert.assertTrue(edgeImitator.waitForMessages()); - // create device and assign to edge Device savedDevice = saveDevice(StringUtils.randomAlphanumeric(15), thermostatDeviceProfile.getName()); edgeImitator.expectMessageAmount(2); // device and device profile messages @@ -471,38 +462,6 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, deviceProfileUpdateMsg.getMsgType()); Assert.assertEquals(thermostatDeviceProfile.getUuidId().getMostSignificantBits(), deviceProfileUpdateMsg.getIdMSB()); Assert.assertEquals(thermostatDeviceProfile.getUuidId().getLeastSignificantBits(), deviceProfileUpdateMsg.getIdLSB()); - - // update device - edgeImitator.expectMessageAmount(1); - savedDevice.setFirmwareId(firmwareOtaPackageInfo.getId()); - - DeviceData deviceData = new DeviceData(); - deviceData.setConfiguration(new DefaultDeviceConfiguration()); - MqttDeviceTransportConfiguration transportConfiguration = new MqttDeviceTransportConfiguration(); - transportConfiguration.getProperties().put("topic", "tb_rule_engine.thermostat"); - deviceData.setTransportConfiguration(transportConfiguration); - savedDevice.setDeviceData(deviceData); - - savedDevice = doPost("/api/device", savedDevice, Device.class); - Assert.assertTrue(edgeImitator.waitForMessages()); - AbstractMessage latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof DeviceUpdateMsg); - deviceUpdateMsg = (DeviceUpdateMsg) latestMessage; - Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, deviceUpdateMsg.getMsgType()); - Assert.assertEquals(savedDevice.getUuidId().getMostSignificantBits(), deviceUpdateMsg.getIdMSB()); - Assert.assertEquals(savedDevice.getUuidId().getLeastSignificantBits(), deviceUpdateMsg.getIdLSB()); - Assert.assertEquals(savedDevice.getName(), deviceUpdateMsg.getName()); - Assert.assertEquals(savedDevice.getType(), deviceUpdateMsg.getType()); - Assert.assertEquals(firmwareOtaPackageInfo.getUuidId().getMostSignificantBits(), deviceUpdateMsg.getFirmwareIdMSB()); - Assert.assertEquals(firmwareOtaPackageInfo.getUuidId().getLeastSignificantBits(), deviceUpdateMsg.getFirmwareIdLSB()); - Optional deviceDataOpt = - dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray()); - Assert.assertTrue(deviceDataOpt.isPresent()); - deviceData = deviceDataOpt.get(); - Assert.assertTrue(deviceData.getTransportConfiguration() instanceof MqttDeviceTransportConfiguration); - MqttDeviceTransportConfiguration mqttDeviceTransportConfiguration = - (MqttDeviceTransportConfiguration) deviceData.getTransportConfiguration(); - Assert.assertEquals("tb_rule_engine.thermostat", mqttDeviceTransportConfiguration.getProperties().get("topic")); return savedDevice; } diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java index 1b19f481f2..912d8c9df7 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java @@ -31,8 +31,12 @@ import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.OtaPackageInfo; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration; +import org.thingsboard.server.common.data.device.data.DeviceData; +import org.thingsboard.server.common.data.device.data.MqttDeviceTransportConfiguration; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; @@ -78,6 +82,7 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest { public void testDevices() throws Exception { // create device and assign to edge; update device Device savedDevice = saveDeviceOnCloudAndVerifyDeliveryToEdge(); + verifyUpdateFirmwareIdAndDeviceData(savedDevice); // unassign device from edge edgeImitator.expectMessageAmount(1); @@ -165,6 +170,45 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest { } + private void verifyUpdateFirmwareIdAndDeviceData(Device savedDevice) throws InterruptedException { + // create ota package + edgeImitator.expectMessageAmount(1); + OtaPackageInfo firmwareOtaPackageInfo = saveOtaPackageInfo(thermostatDeviceProfile.getId()); + Assert.assertTrue(edgeImitator.waitForMessages()); + + // update device + edgeImitator.expectMessageAmount(1); + savedDevice.setFirmwareId(firmwareOtaPackageInfo.getId()); + + DeviceData deviceData = new DeviceData(); + deviceData.setConfiguration(new DefaultDeviceConfiguration()); + MqttDeviceTransportConfiguration transportConfiguration = new MqttDeviceTransportConfiguration(); + transportConfiguration.getProperties().put("topic", "tb_rule_engine.thermostat"); + deviceData.setTransportConfiguration(transportConfiguration); + savedDevice.setDeviceData(deviceData); + + savedDevice = doPost("/api/device", savedDevice, Device.class); + Assert.assertTrue(edgeImitator.waitForMessages()); + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof DeviceUpdateMsg); + DeviceUpdateMsg deviceUpdateMsg = (DeviceUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, deviceUpdateMsg.getMsgType()); + Assert.assertEquals(savedDevice.getUuidId().getMostSignificantBits(), deviceUpdateMsg.getIdMSB()); + Assert.assertEquals(savedDevice.getUuidId().getLeastSignificantBits(), deviceUpdateMsg.getIdLSB()); + Assert.assertEquals(savedDevice.getName(), deviceUpdateMsg.getName()); + Assert.assertEquals(savedDevice.getType(), deviceUpdateMsg.getType()); + Assert.assertEquals(firmwareOtaPackageInfo.getUuidId().getMostSignificantBits(), deviceUpdateMsg.getFirmwareIdMSB()); + Assert.assertEquals(firmwareOtaPackageInfo.getUuidId().getLeastSignificantBits(), deviceUpdateMsg.getFirmwareIdLSB()); + Optional deviceDataOpt = + dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray()); + Assert.assertTrue(deviceDataOpt.isPresent()); + deviceData = deviceDataOpt.get(); + Assert.assertTrue(deviceData.getTransportConfiguration() instanceof MqttDeviceTransportConfiguration); + MqttDeviceTransportConfiguration mqttDeviceTransportConfiguration = + (MqttDeviceTransportConfiguration) deviceData.getTransportConfiguration(); + Assert.assertEquals("tb_rule_engine.thermostat", mqttDeviceTransportConfiguration.getProperties().get("topic")); + } + @Test public void testUpdateDeviceCredentials() throws Exception { // create device and assign to edge; update device From 2a492a3a157abe17e708b72bc01203bdb7e89dd8 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 28 Dec 2022 16:07:49 +0200 Subject: [PATCH 4/4] Move firmware and device data check into update device credentials test --- .../server/edge/BaseDeviceEdgeTest.java | 81 ++++++++++--------- 1 file changed, 41 insertions(+), 40 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java index 912d8c9df7..4af8fede14 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java @@ -82,7 +82,6 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest { public void testDevices() throws Exception { // create device and assign to edge; update device Device savedDevice = saveDeviceOnCloudAndVerifyDeliveryToEdge(); - verifyUpdateFirmwareIdAndDeviceData(savedDevice); // unassign device from edge edgeImitator.expectMessageAmount(1); @@ -170,6 +169,47 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest { } + @Test + public void testUpdateDeviceCredentials() throws Exception { + // create device and assign to edge; update device + Device savedDevice = saveDeviceOnCloudAndVerifyDeliveryToEdge(); + + verifyUpdateFirmwareIdAndDeviceData(savedDevice); + + // update device credentials - ACCESS_TOKEN + edgeImitator.expectMessageAmount(1); + DeviceCredentials deviceCredentials = + doGet("/api/device/" + savedDevice.getId().getId() + "/credentials", DeviceCredentials.class); + Assert.assertEquals(savedDevice.getId(), deviceCredentials.getDeviceId()); + deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN); + deviceCredentials.setCredentialsId("access_token"); + doPost("/api/device/credentials", deviceCredentials) + .andExpect(status().isOk()); + Assert.assertTrue(edgeImitator.waitForMessages()); + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof DeviceCredentialsUpdateMsg); + DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = (DeviceCredentialsUpdateMsg) latestMessage; + Assert.assertEquals(deviceCredentials.getCredentialsType().name(), deviceCredentialsUpdateMsg.getCredentialsType()); + Assert.assertEquals(deviceCredentials.getCredentialsId(), deviceCredentialsUpdateMsg.getCredentialsId()); + Assert.assertFalse(deviceCredentialsUpdateMsg.hasCredentialsValue()); + + // update device credentials - X509_CERTIFICATE + edgeImitator.expectMessageAmount(1); + deviceCredentials.setCredentialsType(DeviceCredentialsType.X509_CERTIFICATE); + deviceCredentials.setCredentialsId(null); + deviceCredentials.setCredentialsValue("-----BEGIN RSA PRIVATE KEY-----"); + doPost("/api/device/credentials", deviceCredentials) + .andExpect(status().isOk()); + Assert.assertTrue(edgeImitator.waitForMessages()); + latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof DeviceCredentialsUpdateMsg); + deviceCredentialsUpdateMsg = (DeviceCredentialsUpdateMsg) latestMessage; + Assert.assertEquals(deviceCredentials.getCredentialsType().name(), deviceCredentialsUpdateMsg.getCredentialsType()); + Assert.assertFalse(deviceCredentialsUpdateMsg.getCredentialsId().isEmpty()); + Assert.assertTrue(deviceCredentialsUpdateMsg.hasCredentialsValue()); + Assert.assertEquals(deviceCredentials.getCredentialsValue(), deviceCredentialsUpdateMsg.getCredentialsValue()); + } + private void verifyUpdateFirmwareIdAndDeviceData(Device savedDevice) throws InterruptedException { // create ota package edgeImitator.expectMessageAmount(1); @@ -209,45 +249,6 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest { Assert.assertEquals("tb_rule_engine.thermostat", mqttDeviceTransportConfiguration.getProperties().get("topic")); } - @Test - public void testUpdateDeviceCredentials() throws Exception { - // create device and assign to edge; update device - Device savedDevice = saveDeviceOnCloudAndVerifyDeliveryToEdge(); - - // update device credentials - ACCESS_TOKEN - edgeImitator.expectMessageAmount(1); - DeviceCredentials deviceCredentials = - doGet("/api/device/" + savedDevice.getId().getId() + "/credentials", DeviceCredentials.class); - Assert.assertEquals(savedDevice.getId(), deviceCredentials.getDeviceId()); - deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN); - deviceCredentials.setCredentialsId("access_token"); - doPost("/api/device/credentials", deviceCredentials) - .andExpect(status().isOk()); - Assert.assertTrue(edgeImitator.waitForMessages()); - AbstractMessage latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof DeviceCredentialsUpdateMsg); - DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = (DeviceCredentialsUpdateMsg) latestMessage; - Assert.assertEquals(deviceCredentials.getCredentialsType().name(), deviceCredentialsUpdateMsg.getCredentialsType()); - Assert.assertEquals(deviceCredentials.getCredentialsId(), deviceCredentialsUpdateMsg.getCredentialsId()); - Assert.assertFalse(deviceCredentialsUpdateMsg.hasCredentialsValue()); - - // update device credentials - X509_CERTIFICATE - edgeImitator.expectMessageAmount(1); - deviceCredentials.setCredentialsType(DeviceCredentialsType.X509_CERTIFICATE); - deviceCredentials.setCredentialsId(null); - deviceCredentials.setCredentialsValue("-----BEGIN RSA PRIVATE KEY-----"); - doPost("/api/device/credentials", deviceCredentials) - .andExpect(status().isOk()); - Assert.assertTrue(edgeImitator.waitForMessages()); - latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof DeviceCredentialsUpdateMsg); - deviceCredentialsUpdateMsg = (DeviceCredentialsUpdateMsg) latestMessage; - Assert.assertEquals(deviceCredentials.getCredentialsType().name(), deviceCredentialsUpdateMsg.getCredentialsType()); - Assert.assertFalse(deviceCredentialsUpdateMsg.getCredentialsId().isEmpty()); - Assert.assertTrue(deviceCredentialsUpdateMsg.hasCredentialsValue()); - Assert.assertEquals(deviceCredentials.getCredentialsValue(), deviceCredentialsUpdateMsg.getCredentialsValue()); - } - @Test public void testDeviceReachedMaximumAllowedOnCloud() throws Exception { // update tenant profile configuration