From 6adf0f256e6cf183d36d203c66f96628f9ed1c4d Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 10 Jun 2022 14:48:20 +0300 Subject: [PATCH] Report device activity from edge service - set active flag to true on cloud --- .../service/edge/rpc/EdgeGrpcSession.java | 2 +- .../edge/rpc/processor/BaseEdgeProcessor.java | 10 ++++ .../rpc/processor/TelemetryEdgeProcessor.java | 54 +++++++++++++++++-- .../thingsboard/server/edge/BaseEdgeTest.java | 27 +++++++--- 4 files changed, 81 insertions(+), 12 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 6c991444b2..bb67b760b5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -571,7 +571,7 @@ public final class EdgeGrpcSession implements Closeable { try { if (uplinkMsg.getEntityDataCount() > 0) { for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) { - result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), edge.getCustomerId(), entityData)); + result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), edge.getCustomerId(), entityData, sessionId)); } } if (uplinkMsg.getDeviceUpdateMsgCount() > 0) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 09bef32af9..46b36a1932 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EdgeUtils; @@ -52,6 +53,8 @@ import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.service.edge.rpc.constructor.AdminSettingsMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AlarmMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AssetMsgConstructor; @@ -137,6 +140,13 @@ public abstract class BaseEdgeProcessor { @Autowired protected WidgetTypeService widgetTypeService; + @Autowired + protected PartitionService partitionService; + + @Autowired + @Lazy + protected TbQueueProducerProvider producerProvider; + @Autowired protected DataValidator deviceValidator; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java index 769d988507..cc17970ec8 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java @@ -52,6 +52,8 @@ import org.thingsboard.server.common.data.kv.AttributeKey; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.util.JsonUtils; @@ -61,9 +63,12 @@ import org.thingsboard.server.gen.edge.v1.EntityDataProto; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.util.TbCoreComponent; import javax.annotation.Nullable; +import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -77,14 +82,19 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { private final Gson gson = new Gson(); - public List> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) { + private TbQueueProducer> tbCoreMsgProducer; + + @PostConstruct + public void init() { + tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); + } + + public List> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData, UUID edgeSessionId) { log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData); List> result = new ArrayList<>(); EntityId entityId = constructEntityId(entityData); if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) { - // @voba - in terms of performance we should not fetch device from DB by id - // TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId); - TbMsgMetaData metaData = new TbMsgMetaData(); + TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId); metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE); if (entityData.hasPostAttributesMsg()) { result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData)); @@ -96,6 +106,11 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { if (entityData.hasPostTelemetryMsg()) { result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); } + if (EntityType.DEVICE.equals(entityId.getEntityType())) { + Device device = deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())); + // for edge context sessionId is exact edgeSessionId + reportActivity(device, edgeSessionId); + } } if (entityData.hasAttributeDeleteMsg()) { result.add(processAttributeDeleteMsg(tenantId, entityId, entityData.getAttributeDeleteMsg(), entityData.getEntityType())); @@ -103,6 +118,37 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { return result; } + private void reportActivity(Device device, UUID sessionId) { + TransportProtos.SessionInfoProto.Builder builder = TransportProtos.SessionInfoProto.newBuilder() + .setSessionIdMSB(sessionId.getMostSignificantBits()) + .setSessionIdLSB(sessionId.getLeastSignificantBits()) + .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits()) + .setDeviceIdMSB(device.getId().getId().getMostSignificantBits()) + .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits()) + .setDeviceName(device.getName()) + .setDeviceType(device.getType()) + .setDeviceProfileIdMSB(device.getDeviceProfileId().getId().getMostSignificantBits()) + .setDeviceProfileIdLSB(device.getDeviceProfileId().getId().getLeastSignificantBits()); + + if (device.getCustomerId() != null && !device.getCustomerId().isNullUid()) { + builder.setCustomerIdMSB(device.getCustomerId().getId().getMostSignificantBits()); + builder.setCustomerIdLSB(device.getCustomerId().getId().getLeastSignificantBits()); + } + reportActivity(device.getTenantId(), device.getId(), device.getUuidId(), builder.build()); + } + + private void reportActivity(TenantId tenantId, DeviceId deviceId, UUID routingKey, TransportProtos.SessionInfoProto sessionInfo) { + TransportProtos.SubscriptionInfoProto subscriptionInfoProto = TransportProtos.SubscriptionInfoProto.newBuilder() + .setAttributeSubscription(false).setRpcSubscription(false) + .setLastActivityTime(System.currentTimeMillis()).build(); + TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setSubscriptionInfo(subscriptionInfoProto).build(); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); + tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, + TransportProtos.ToCoreMsg.newBuilder().setToDeviceActorMsg(msg).build()), null); + } + private TbMsgMetaData constructBaseMsgMetadata(TenantId tenantId, EntityId entityId) { TbMsgMetaData metaData = new TbMsgMetaData(); switch (entityId.getEntityType()) { diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java index 61bb53b060..a2f72a86d2 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java @@ -1339,17 +1339,17 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { String timeseriesKey = "key"; String timeseriesValue = "25"; data.addProperty(timeseriesKey, timeseriesValue); - UplinkMsg.Builder uplinkMsgBuilder1 = UplinkMsg.newBuilder(); + UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); EntityDataProto.Builder entityDataBuilder = EntityDataProto.newBuilder(); entityDataBuilder.setPostTelemetryMsg(JsonConverter.convertToTelemetryProto(data, System.currentTimeMillis())); entityDataBuilder.setEntityType(device.getId().getEntityType().name()); entityDataBuilder.setEntityIdMSB(device.getUuidId().getMostSignificantBits()); entityDataBuilder.setEntityIdLSB(device.getUuidId().getLeastSignificantBits()); testAutoGeneratedCodeByProtobuf(entityDataBuilder); - uplinkMsgBuilder1.addEntityData(entityDataBuilder.build()); + uplinkMsgBuilder.addEntityData(entityDataBuilder.build()); - testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder1); - edgeImitator.sendUplinkMsg(uplinkMsgBuilder1.build()); + testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); JsonObject attributesData = new JsonObject(); String attributesKey = "test_attr"; @@ -1381,9 +1381,22 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { String attributeValuesUrl = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/attributes/" + DataConstants.SERVER_SCOPE; List> attributes = doGetAsyncTyped(attributeValuesUrl, new TypeReference<>() {}); - Assert.assertEquals(2, attributes.size()); - var result = attributes.stream().filter(kv -> kv.get("key").equals(attributesKey)).filter(kv -> kv.get("value").equals(attributesValue)).findFirst(); - Assert.assertTrue(result.isPresent()); + + Assert.assertEquals(3, attributes.size()); + + Optional> activeAttributeOpt = getAttributeByKey("active", attributes); + Assert.assertTrue(activeAttributeOpt.isPresent()); + Map activeAttribute = activeAttributeOpt.get(); + Assert.assertEquals("true", activeAttribute.get("value")); + + Optional> customAttributeOpt = getAttributeByKey(attributesKey, attributes); + Assert.assertTrue(customAttributeOpt.isPresent()); + Map customAttribute = customAttributeOpt.get(); + Assert.assertEquals(attributesValue, customAttribute.get("value")); + } + + private Optional> getAttributeByKey(String key, List> attributes) { + return attributes.stream().filter(kv -> kv.get("key").equals(key)).findFirst(); } private Map>> loadDeviceTimeseries(Device device, String timeseriesKey) throws Exception {