From a3944c50477c69748022922f3663ce6acbe808a4 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 22 Jun 2022 15:56:50 +0300 Subject: [PATCH] Introduced new proto for device activity --- .../device/DeviceActorMessageProcessor.java | 7 +++++ .../edge/rpc/processor/BaseEdgeProcessor.java | 10 +++++++ .../rpc/processor/TelemetryEdgeProcessor.java | 30 ++++++++++++++++++- common/cluster-api/src/main/proto/queue.proto | 5 ++++ 4 files changed, 51 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index b0830317a7..28c7a6a499 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -418,6 +418,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { if (msg.hasUplinkNotificationMsg()) { processUplinkNotificationMsg(context, sessionInfo, msg.getUplinkNotificationMsg()); } + if (msg.hasDeviceActivity()) { + handleDeviceActivity(context, msg.getDeviceActivity()); + } callback.onSuccess(); } @@ -729,6 +732,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } } + private void handleDeviceActivity(TbActorCtx context, TransportProtos.DeviceActivityProto deviceActivityProto) { + systemContext.getDeviceStateService().onDeviceActivity(tenantId, deviceId, deviceActivityProto.getLastActivityTime()); + } + void processCredentialsUpdate(TbActorMsg msg) { if (((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials().getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) { sessions.forEach((k, v) -> { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index c67f7a5a14..8f37d743b5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EdgeUtils; @@ -53,6 +54,8 @@ import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.service.edge.rpc.constructor.AdminSettingsMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AlarmMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AssetMsgConstructor; @@ -142,6 +145,13 @@ public abstract class BaseEdgeProcessor { @Autowired protected OtaPackageService otaPackageService; + @Autowired + protected PartitionService partitionService; + + @Autowired + @Lazy + protected TbQueueProducerProvider producerProvider; + @Autowired protected DataValidator deviceValidator; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java index 5c807f9434..88ea417c24 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java @@ -52,6 +52,8 @@ import org.thingsboard.server.common.data.kv.AttributeKey; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.util.JsonUtils; @@ -61,9 +63,12 @@ import org.thingsboard.server.gen.edge.v1.EntityDataProto; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.util.TbCoreComponent; import javax.annotation.Nullable; +import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -77,6 +82,13 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { private final Gson gson = new Gson(); + private TbQueueProducer> tbCoreMsgProducer; + + @PostConstruct + public void init() { + tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); + } + public List> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) { log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData); List> result = new ArrayList<>(); @@ -95,7 +107,23 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); } if (EntityType.DEVICE.equals(entityId.getEntityType())) { - deviceStateService.onDeviceActivity(tenantId, new DeviceId(entityId.getId()), System.currentTimeMillis()); + DeviceId deviceId = new DeviceId(entityId.getId()); + + TransportProtos.SessionInfoProto sessionInfo = TransportProtos.SessionInfoProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()).build(); + + TransportProtos.DeviceActivityProto deviceActivityProto = TransportProtos.DeviceActivityProto.newBuilder() + .setLastActivityTime(System.currentTimeMillis()).build(); + + TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setDeviceActivity(deviceActivityProto).build(); + + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); + tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(), + TransportProtos.ToCoreMsg.newBuilder().setToDeviceActorMsg(msg).build()), null); } } if (entityData.hasAttributeDeleteMsg()) { diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 3079978bce..7fefe18d4a 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -454,6 +454,10 @@ message GetOtaPackageResponseMsg { string fileName = 8; } +message DeviceActivityProto { + int64 lastActivityTime = 1; +} + //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. message SubscriptionInfoProto { int64 lastActivityTime = 1; @@ -483,6 +487,7 @@ message TransportToDeviceActorMsg { ToDeviceRpcResponseStatusMsg rpcResponseStatusMsg = 10; SendPendingRPCMsg sendPendingRPC = 11; UplinkNotificationMsg uplinkNotificationMsg = 12; + DeviceActivityProto deviceActivity = 13; } message TransportToRuleEngineMsg {