Notify devices in case shared attribute updates from edge
This commit is contained in:
parent
0813f3a972
commit
ecda9c8e2f
@ -74,9 +74,11 @@ import org.thingsboard.server.service.edge.rpc.constructor.RuleChainMsgConstruct
|
|||||||
import org.thingsboard.server.service.edge.rpc.constructor.UserMsgConstructor;
|
import org.thingsboard.server.service.edge.rpc.constructor.UserMsgConstructor;
|
||||||
import org.thingsboard.server.service.edge.rpc.constructor.WidgetTypeMsgConstructor;
|
import org.thingsboard.server.service.edge.rpc.constructor.WidgetTypeMsgConstructor;
|
||||||
import org.thingsboard.server.service.edge.rpc.constructor.WidgetsBundleMsgConstructor;
|
import org.thingsboard.server.service.edge.rpc.constructor.WidgetsBundleMsgConstructor;
|
||||||
|
import org.thingsboard.server.service.entitiy.TbNotificationEntityService;
|
||||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
||||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||||
import org.thingsboard.server.service.state.DeviceStateService;
|
import org.thingsboard.server.service.state.DeviceStateService;
|
||||||
|
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -88,6 +90,12 @@ public abstract class BaseEdgeProcessor {
|
|||||||
|
|
||||||
protected static final int DEFAULT_PAGE_SIZE = 1000;
|
protected static final int DEFAULT_PAGE_SIZE = 1000;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
protected TelemetrySubscriptionService tsSubService;
|
||||||
|
|
||||||
|
@Autowired(required = false)
|
||||||
|
protected TbNotificationEntityService notificationEntityService;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
protected RuleChainService ruleChainService;
|
protected RuleChainService ruleChainService;
|
||||||
|
|
||||||
|
|||||||
@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.EdgeUtils;
|
|||||||
import org.thingsboard.server.common.data.EntityType;
|
import org.thingsboard.server.common.data.EntityType;
|
||||||
import org.thingsboard.server.common.data.EntityView;
|
import org.thingsboard.server.common.data.EntityView;
|
||||||
import org.thingsboard.server.common.data.asset.Asset;
|
import org.thingsboard.server.common.data.asset.Asset;
|
||||||
|
import org.thingsboard.server.common.data.audit.ActionType;
|
||||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||||
import org.thingsboard.server.common.data.id.AssetId;
|
import org.thingsboard.server.common.data.id.AssetId;
|
||||||
@ -57,6 +58,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
|||||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
|
import org.thingsboard.server.common.msg.session.SessionMsgType;
|
||||||
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
|
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
|
||||||
import org.thingsboard.server.common.transport.util.JsonUtils;
|
import org.thingsboard.server.common.transport.util.JsonUtils;
|
||||||
|
import org.thingsboard.server.controller.BaseController;
|
||||||
import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg;
|
import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg;
|
||||||
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||||
import org.thingsboard.server.gen.edge.v1.EntityDataProto;
|
import org.thingsboard.server.gen.edge.v1.EntityDataProto;
|
||||||
@ -101,7 +103,7 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
|||||||
}
|
}
|
||||||
if (entityData.hasAttributesUpdatedMsg()) {
|
if (entityData.hasAttributesUpdatedMsg()) {
|
||||||
metaData.putValue("scope", entityData.getPostAttributeScope());
|
metaData.putValue("scope", entityData.getPostAttributeScope());
|
||||||
result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData));
|
result.add(processAttributesUpdate(tenantId, entityId, entityData.getAttributesUpdatedMsg(), metaData));
|
||||||
}
|
}
|
||||||
if (entityData.hasPostTelemetryMsg()) {
|
if (entityData.hasPostTelemetryMsg()) {
|
||||||
result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData));
|
result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData));
|
||||||
@ -221,39 +223,36 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
|||||||
return futureToSet;
|
return futureToSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<Void> processAttributesUpdate(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) {
|
private ListenableFuture<Void> processAttributesUpdate(TenantId tenantId,
|
||||||
|
EntityId entityId,
|
||||||
|
TransportProtos.PostAttributeMsg msg,
|
||||||
|
TbMsgMetaData metaData) {
|
||||||
SettableFuture<Void> futureToSet = SettableFuture.create();
|
SettableFuture<Void> futureToSet = SettableFuture.create();
|
||||||
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
|
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
|
||||||
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(json);
|
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json));
|
||||||
ListenableFuture<List<String>> future = attributesService.save(tenantId, entityId, metaData.getValue("scope"), new ArrayList<>(attributes));
|
String scope = metaData.getValue("scope");
|
||||||
Futures.addCallback(future, new FutureCallback<>() {
|
tsSubService.saveAndNotify(tenantId, entityId, scope, attributes, new FutureCallback<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(@Nullable List<String> keys) {
|
public void onSuccess(@Nullable Void tmp) {
|
||||||
var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
|
logAttributesUpdated(tenantId, entityId, scope, attributes, null);
|
||||||
TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), DataConstants.ATTRIBUTES_UPDATED, entityId, customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null);
|
futureToSet.set(null);
|
||||||
tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(TbQueueMsgMetadata metadata) {
|
|
||||||
futureToSet.set(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Throwable t) {
|
|
||||||
log.error("Can't process attributes update [{}]", msg, t);
|
|
||||||
futureToSet.setException(t);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
log.error("Can't process attributes update [{}]", msg, t);
|
log.error("Can't process attributes update [{}]", msg, t);
|
||||||
|
logAttributesUpdated(tenantId, entityId, scope, attributes, t);
|
||||||
futureToSet.setException(t);
|
futureToSet.setException(t);
|
||||||
}
|
}
|
||||||
}, dbCallbackExecutorService);
|
});
|
||||||
return futureToSet;
|
return futureToSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void logAttributesUpdated(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, Throwable e) {
|
||||||
|
notificationEntityService.logEntityAction(tenantId, entityId, ActionType.ATTRIBUTES_UPDATED, null,
|
||||||
|
BaseController.toException(e), scope, attributes);
|
||||||
|
}
|
||||||
|
|
||||||
private ListenableFuture<Void> processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) {
|
private ListenableFuture<Void> processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) {
|
||||||
SettableFuture<Void> futureToSet = SettableFuture.create();
|
SettableFuture<Void> futureToSet = SettableFuture.create();
|
||||||
String scope = attributeDeleteMsg.getScope();
|
String scope = attributeDeleteMsg.getScope();
|
||||||
|
|||||||
@ -26,6 +26,7 @@ import com.google.protobuf.AbstractMessage;
|
|||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import com.google.protobuf.InvalidProtocolBufferException;
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
import com.google.protobuf.MessageLite;
|
import com.google.protobuf.MessageLite;
|
||||||
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||||
import org.awaitility.Awaitility;
|
import org.awaitility.Awaitility;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -162,10 +163,11 @@ import org.thingsboard.server.gen.transport.TransportProtos;
|
|||||||
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
|
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
|
||||||
import org.thingsboard.server.transport.AbstractTransportIntegrationTest;
|
import org.thingsboard.server.transport.AbstractTransportIntegrationTest;
|
||||||
import org.thingsboard.server.transport.lwm2m.AbstractLwM2MIntegrationTest;
|
import org.thingsboard.server.transport.lwm2m.AbstractLwM2MIntegrationTest;
|
||||||
|
import org.thingsboard.server.transport.mqtt.MqttTestCallback;
|
||||||
|
import org.thingsboard.server.transport.mqtt.MqttTestClient;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
@ -174,11 +176,13 @@ import java.util.TreeMap;
|
|||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
|
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
|
||||||
import static org.thingsboard.server.common.data.ota.OtaPackageType.FIRMWARE;
|
import static org.thingsboard.server.common.data.ota.OtaPackageType.FIRMWARE;
|
||||||
|
|
||||||
@TestPropertySource(properties = {
|
@TestPropertySource(properties = {
|
||||||
"edges.enabled=true",
|
"edges.enabled=true",
|
||||||
|
"transport.mqtt.enabled=true"
|
||||||
})
|
})
|
||||||
abstract public class BaseEdgeTest extends AbstractControllerTest {
|
abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||||
|
|
||||||
@ -2147,6 +2151,42 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
|||||||
Assert.assertEquals(queueUpdateMsg.getIdLSB(), savedQueue.getUuidId().getLeastSignificantBits());
|
Assert.assertEquals(queueUpdateMsg.getIdLSB(), savedQueue.getUuidId().getLeastSignificantBits());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void updateSharedAttributeOnCloudAndValidateDeviceSubscription() throws Exception {
|
||||||
|
Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge();
|
||||||
|
|
||||||
|
DeviceCredentials deviceCredentials = doGet("/api/device/" + device.getUuidId() + "/credentials", DeviceCredentials.class);
|
||||||
|
|
||||||
|
MqttTestClient client = new MqttTestClient();
|
||||||
|
client.connectAndWait(deviceCredentials.getCredentialsId());
|
||||||
|
MqttTestCallback onUpdateCallback = new MqttTestCallback();
|
||||||
|
client.setCallback(onUpdateCallback);
|
||||||
|
client.subscribeAndWait("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE);
|
||||||
|
|
||||||
|
edgeImitator.expectResponsesAmount(1);
|
||||||
|
|
||||||
|
JsonObject attributesData = new JsonObject();
|
||||||
|
String attrKey = "sharedAttrName";
|
||||||
|
String attrValue = "sharedAttrValue";
|
||||||
|
attributesData.addProperty(attrKey, attrValue);
|
||||||
|
UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();
|
||||||
|
EntityDataProto.Builder entityDataBuilder = EntityDataProto.newBuilder();
|
||||||
|
entityDataBuilder.setEntityType(device.getId().getEntityType().name());
|
||||||
|
entityDataBuilder.setEntityIdMSB(device.getId().getId().getMostSignificantBits());
|
||||||
|
entityDataBuilder.setEntityIdLSB(device.getId().getId().getLeastSignificantBits());
|
||||||
|
entityDataBuilder.setAttributesUpdatedMsg(JsonConverter.convertToAttributesProto(attributesData));
|
||||||
|
entityDataBuilder.setPostAttributeScope(DataConstants.SHARED_SCOPE);
|
||||||
|
uplinkMsgBuilder.addEntityData(entityDataBuilder.build());
|
||||||
|
|
||||||
|
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
|
||||||
|
Assert.assertTrue(edgeImitator.waitForResponses());
|
||||||
|
|
||||||
|
Assert.assertTrue(onUpdateCallback.getSubscribeLatch().await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
assertEquals(JacksonUtil.OBJECT_MAPPER.createObjectNode().put(attrKey, attrValue),
|
||||||
|
JacksonUtil.fromBytes(onUpdateCallback.getPayloadBytes()));
|
||||||
|
}
|
||||||
|
|
||||||
// Utility methods
|
// Utility methods
|
||||||
|
|
||||||
private Device saveDeviceOnCloudAndVerifyDeliveryToEdge() throws Exception {
|
private Device saveDeviceOnCloudAndVerifyDeliveryToEdge() throws Exception {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user