Edge telemetry processor - added support of USER entity

This commit is contained in:
Volodymyr Babak 2022-12-13 15:41:53 +02:00
parent 0cd2ad03ad
commit 055fbce062
2 changed files with 37 additions and 29 deletions

View File

@ -48,7 +48,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@ -71,9 +71,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Component
@Slf4j
@ -325,6 +323,9 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
case CUSTOMER:
entityId = new CustomerId(edgeEvent.getEntityId());
break;
case USER:
entityId = new UserId(edgeEvent.getEntityId());
break;
case EDGE:
entityId = new EdgeId(edgeEvent.getEntityId());
break;

View File

@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg;
import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg;
import org.thingsboard.server.gen.edge.v1.EntityDataProto;
@ -67,36 +68,11 @@ abstract public class BaseTelemetryEdgeTest extends AbstractEdgeTest {
public void testAttributes() throws Exception {
Device device = findDeviceByName("Edge Device 1");
testAttributesUpdatedMsg(device);
testAttributesUpdatedMsg(device.getId());
testPostAttributesMsg(device);
testAttributesDeleteMsg(device);
}
private void testAttributesUpdatedMsg(Device device) throws Exception {
String attributesData = "{\"scope\":\"SERVER_SCOPE\",\"kv\":{\"key1\":\"value1\"}}";
JsonNode attributesEntityData = mapper.readTree(attributesData);
EdgeEvent edgeEvent1 = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.ATTRIBUTES_UPDATED, device.getId().getId(), EdgeEventType.DEVICE, attributesEntityData);
edgeImitator.expectMessageAmount(1);
edgeEventService.saveAsync(edgeEvent1).get();
clusterService.onEdgeEventUpdate(tenantId, edge.getId());
Assert.assertTrue(edgeImitator.waitForMessages());
AbstractMessage latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof EntityDataProto);
EntityDataProto latestEntityDataMsg = (EntityDataProto) latestMessage;
Assert.assertEquals(device.getUuidId().getMostSignificantBits(), latestEntityDataMsg.getEntityIdMSB());
Assert.assertEquals(device.getUuidId().getLeastSignificantBits(), latestEntityDataMsg.getEntityIdLSB());
Assert.assertEquals(device.getId().getEntityType().name(), latestEntityDataMsg.getEntityType());
Assert.assertEquals("SERVER_SCOPE", latestEntityDataMsg.getPostAttributeScope());
Assert.assertTrue(latestEntityDataMsg.hasAttributesUpdatedMsg());
TransportProtos.PostAttributeMsg attributesUpdatedMsg = latestEntityDataMsg.getAttributesUpdatedMsg();
Assert.assertEquals(1, attributesUpdatedMsg.getKvCount());
TransportProtos.KeyValueProto keyValueProto = attributesUpdatedMsg.getKv(0);
Assert.assertEquals("key1", keyValueProto.getKey());
Assert.assertEquals("value1", keyValueProto.getStringV());
}
private void testPostAttributesMsg(Device device) throws Exception {
String postAttributesData = "{\"scope\":\"SERVER_SCOPE\",\"kv\":{\"key2\":\"value2\"}}";
JsonNode postAttributesEntityData = mapper.readTree(postAttributesData);
@ -226,4 +202,35 @@ abstract public class BaseTelemetryEdgeTest extends AbstractEdgeTest {
edgeImitator.setRandomFailuresOnTimeseriesDownlink(false);
}
@Test
public void testAttributesUpdatedMsg_userEntity() throws Exception {
testAttributesUpdatedMsg(tenantAdmin.getId());
}
private void testAttributesUpdatedMsg(EntityId entityId) throws Exception {
String attributesData = "{\"scope\":\"SERVER_SCOPE\",\"kv\":{\"key1\":\"value1\"}}";
JsonNode attributesEntityData = mapper.readTree(attributesData);
EdgeEvent edgeEvent1 = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.ATTRIBUTES_UPDATED, entityId.getId(), EdgeEventType.valueOf(entityId.getEntityType().name()), attributesEntityData);
edgeImitator.expectMessageAmount(1);
edgeEventService.saveAsync(edgeEvent1).get();
clusterService.onEdgeEventUpdate(tenantId, edge.getId());
Assert.assertTrue(edgeImitator.waitForMessages());
AbstractMessage latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof EntityDataProto);
EntityDataProto latestEntityDataMsg = (EntityDataProto) latestMessage;
Assert.assertEquals(entityId.getId().getMostSignificantBits(), latestEntityDataMsg.getEntityIdMSB());
Assert.assertEquals(entityId.getId().getLeastSignificantBits(), latestEntityDataMsg.getEntityIdLSB());
Assert.assertEquals(entityId.getEntityType().name(), latestEntityDataMsg.getEntityType());
Assert.assertEquals("SERVER_SCOPE", latestEntityDataMsg.getPostAttributeScope());
Assert.assertTrue(latestEntityDataMsg.hasAttributesUpdatedMsg());
TransportProtos.PostAttributeMsg attributesUpdatedMsg = latestEntityDataMsg.getAttributesUpdatedMsg();
Assert.assertEquals(1, attributesUpdatedMsg.getKvCount());
TransportProtos.KeyValueProto keyValueProto = attributesUpdatedMsg.getKv(0);
Assert.assertEquals("key1", keyValueProto.getKey());
Assert.assertEquals("value1", keyValueProto.getStringV());
}
}