From ecda9c8e2fb1cc7254798c11c7ee8ea9763d7630 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 6 Sep 2022 13:38:10 +0300 Subject: [PATCH 1/2] Notify devices in case shared attribute updates from edge --- .../edge/rpc/processor/BaseEdgeProcessor.java | 8 ++++ .../rpc/processor/TelemetryEdgeProcessor.java | 41 +++++++++--------- .../thingsboard/server/edge/BaseEdgeTest.java | 42 ++++++++++++++++++- 3 files changed, 69 insertions(+), 22 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 9d6199bdcd..2be5a5684d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -74,9 +74,11 @@ import org.thingsboard.server.service.edge.rpc.constructor.RuleChainMsgConstruct import org.thingsboard.server.service.edge.rpc.constructor.UserMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.WidgetTypeMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.WidgetsBundleMsgConstructor; +import org.thingsboard.server.service.entitiy.TbNotificationEntityService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.state.DeviceStateService; +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.util.ArrayList; import java.util.List; @@ -88,6 +90,12 @@ public abstract class BaseEdgeProcessor { protected static final int DEFAULT_PAGE_SIZE = 1000; + @Autowired + protected TelemetrySubscriptionService tsSubService; + + @Autowired(required = false) + protected TbNotificationEntityService notificationEntityService; + @Autowired protected RuleChainService ruleChainService; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java index ad6446cb95..32833da649 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java @@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.id.AssetId; @@ -57,6 +58,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.util.JsonUtils; +import org.thingsboard.server.controller.BaseController; import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EntityDataProto; @@ -101,7 +103,7 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { } if (entityData.hasAttributesUpdatedMsg()) { metaData.putValue("scope", entityData.getPostAttributeScope()); - result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); + result.add(processAttributesUpdate(tenantId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); } if (entityData.hasPostTelemetryMsg()) { result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); @@ -221,39 +223,36 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { return futureToSet; } - private ListenableFuture processAttributesUpdate(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { + private ListenableFuture processAttributesUpdate(TenantId tenantId, + EntityId entityId, + TransportProtos.PostAttributeMsg msg, + TbMsgMetaData metaData) { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); - Set attributes = JsonConverter.convertToAttributes(json); - ListenableFuture> future = attributesService.save(tenantId, entityId, metaData.getValue("scope"), new ArrayList<>(attributes)); - Futures.addCallback(future, new FutureCallback<>() { + List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json)); + String scope = metaData.getValue("scope"); + tsSubService.saveAndNotify(tenantId, entityId, scope, attributes, new FutureCallback() { @Override - public void onSuccess(@Nullable List keys) { - var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); - TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), DataConstants.ATTRIBUTES_UPDATED, entityId, customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null); - tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - futureToSet.set(null); - } - - @Override - public void onFailure(Throwable t) { - log.error("Can't process attributes update [{}]", msg, t); - futureToSet.setException(t); - } - }); + public void onSuccess(@Nullable Void tmp) { + logAttributesUpdated(tenantId, entityId, scope, attributes, null); + futureToSet.set(null); } @Override public void onFailure(Throwable t) { log.error("Can't process attributes update [{}]", msg, t); + logAttributesUpdated(tenantId, entityId, scope, attributes, t); futureToSet.setException(t); } - }, dbCallbackExecutorService); + }); return futureToSet; } + private void logAttributesUpdated(TenantId tenantId, EntityId entityId, String scope, List attributes, Throwable e) { + notificationEntityService.logEntityAction(tenantId, entityId, ActionType.ATTRIBUTES_UPDATED, null, + BaseController.toException(e), scope, attributes); + } + private ListenableFuture processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) { SettableFuture futureToSet = SettableFuture.create(); String scope = attributeDeleteMsg.getScope(); diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java index dcd354ace5..34f71a9ffa 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java @@ -26,6 +26,7 @@ import com.google.protobuf.AbstractMessage; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; +import io.netty.handler.codec.mqtt.MqttQoS; import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; @@ -162,10 +163,11 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.transport.AbstractTransportIntegrationTest; import org.thingsboard.server.transport.lwm2m.AbstractLwM2MIntegrationTest; +import org.thingsboard.server.transport.mqtt.MqttTestCallback; +import org.thingsboard.server.transport.mqtt.MqttTestClient; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -174,11 +176,13 @@ import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertEquals; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import static org.thingsboard.server.common.data.ota.OtaPackageType.FIRMWARE; @TestPropertySource(properties = { "edges.enabled=true", + "transport.mqtt.enabled=true" }) abstract public class BaseEdgeTest extends AbstractControllerTest { @@ -2147,6 +2151,42 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { Assert.assertEquals(queueUpdateMsg.getIdLSB(), savedQueue.getUuidId().getLeastSignificantBits()); } + @Test + public void updateSharedAttributeOnCloudAndValidateDeviceSubscription() throws Exception { + Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge(); + + DeviceCredentials deviceCredentials = doGet("/api/device/" + device.getUuidId() + "/credentials", DeviceCredentials.class); + + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(deviceCredentials.getCredentialsId()); + MqttTestCallback onUpdateCallback = new MqttTestCallback(); + client.setCallback(onUpdateCallback); + client.subscribeAndWait("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE); + + edgeImitator.expectResponsesAmount(1); + + JsonObject attributesData = new JsonObject(); + String attrKey = "sharedAttrName"; + String attrValue = "sharedAttrValue"; + attributesData.addProperty(attrKey, attrValue); + UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); + EntityDataProto.Builder entityDataBuilder = EntityDataProto.newBuilder(); + entityDataBuilder.setEntityType(device.getId().getEntityType().name()); + entityDataBuilder.setEntityIdMSB(device.getId().getId().getMostSignificantBits()); + entityDataBuilder.setEntityIdLSB(device.getId().getId().getLeastSignificantBits()); + entityDataBuilder.setAttributesUpdatedMsg(JsonConverter.convertToAttributesProto(attributesData)); + entityDataBuilder.setPostAttributeScope(DataConstants.SHARED_SCOPE); + uplinkMsgBuilder.addEntityData(entityDataBuilder.build()); + + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); + Assert.assertTrue(edgeImitator.waitForResponses()); + + Assert.assertTrue(onUpdateCallback.getSubscribeLatch().await(5, TimeUnit.SECONDS)); + + assertEquals(JacksonUtil.OBJECT_MAPPER.createObjectNode().put(attrKey, attrValue), + JacksonUtil.fromBytes(onUpdateCallback.getPayloadBytes())); + } + // Utility methods private Device saveDeviceOnCloudAndVerifyDeliveryToEdge() throws Exception { From 35288bbfe8cf041bcdb04789dd638589b5a5d2e8 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 12 Oct 2022 12:27:22 +0300 Subject: [PATCH 2/2] Removed required=false. Renamed test name --- .../server/service/edge/rpc/processor/BaseEdgeProcessor.java | 2 +- .../src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 2be5a5684d..40d5c8d404 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -93,7 +93,7 @@ public abstract class BaseEdgeProcessor { @Autowired protected TelemetrySubscriptionService tsSubService; - @Autowired(required = false) + @Autowired protected TbNotificationEntityService notificationEntityService; @Autowired diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java index 34f71a9ffa..180a345633 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java @@ -2152,7 +2152,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { } @Test - public void updateSharedAttributeOnCloudAndValidateDeviceSubscription() throws Exception { + public void sendUpdateSharedAttributeToCloudAndValidateDeviceSubscription() throws Exception { Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge(); DeviceCredentials deviceCredentials = doGet("/api/device/" + device.getUuidId() + "/credentials", DeviceCredentials.class);