Introduced new proto for device activity

This commit is contained in:
Volodymyr Babak 2022-06-22 15:56:50 +03:00
parent 87834c2ccd
commit a3944c5047
4 changed files with 51 additions and 1 deletions

View File

@ -418,6 +418,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
if (msg.hasUplinkNotificationMsg()) { if (msg.hasUplinkNotificationMsg()) {
processUplinkNotificationMsg(context, sessionInfo, msg.getUplinkNotificationMsg()); processUplinkNotificationMsg(context, sessionInfo, msg.getUplinkNotificationMsg());
} }
if (msg.hasDeviceActivity()) {
handleDeviceActivity(context, msg.getDeviceActivity());
}
callback.onSuccess(); callback.onSuccess();
} }
@ -729,6 +732,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
} }
} }
private void handleDeviceActivity(TbActorCtx context, TransportProtos.DeviceActivityProto deviceActivityProto) {
systemContext.getDeviceStateService().onDeviceActivity(tenantId, deviceId, deviceActivityProto.getLastActivityTime());
}
void processCredentialsUpdate(TbActorMsg msg) { void processCredentialsUpdate(TbActorMsg msg) {
if (((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials().getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) { if (((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials().getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) {
sessions.forEach((k, v) -> { sessions.forEach((k, v) -> {

View File

@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EdgeUtils;
@ -53,6 +54,8 @@ import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.service.edge.rpc.constructor.AdminSettingsMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AdminSettingsMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.AlarmMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AlarmMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.AssetMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AssetMsgConstructor;
@ -142,6 +145,13 @@ public abstract class BaseEdgeProcessor {
@Autowired @Autowired
protected OtaPackageService otaPackageService; protected OtaPackageService otaPackageService;
@Autowired
protected PartitionService partitionService;
@Autowired
@Lazy
protected TbQueueProducerProvider producerProvider;
@Autowired @Autowired
protected DataValidator<Device> deviceValidator; protected DataValidator<Device> deviceValidator;

View File

@ -52,6 +52,8 @@ import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.util.JsonUtils; import org.thingsboard.server.common.transport.util.JsonUtils;
@ -61,9 +63,12 @@ import org.thingsboard.server.gen.edge.v1.EntityDataProto;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -77,6 +82,13 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
private final Gson gson = new Gson(); private final Gson gson = new Gson();
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> tbCoreMsgProducer;
@PostConstruct
public void init() {
tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer();
}
public List<ListenableFuture<Void>> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) { public List<ListenableFuture<Void>> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) {
log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData); log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData);
List<ListenableFuture<Void>> result = new ArrayList<>(); List<ListenableFuture<Void>> result = new ArrayList<>();
@ -95,7 +107,23 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData));
} }
if (EntityType.DEVICE.equals(entityId.getEntityType())) { if (EntityType.DEVICE.equals(entityId.getEntityType())) {
deviceStateService.onDeviceActivity(tenantId, new DeviceId(entityId.getId()), System.currentTimeMillis()); DeviceId deviceId = new DeviceId(entityId.getId());
TransportProtos.SessionInfoProto sessionInfo = TransportProtos.SessionInfoProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()).build();
TransportProtos.DeviceActivityProto deviceActivityProto = TransportProtos.DeviceActivityProto.newBuilder()
.setLastActivityTime(System.currentTimeMillis()).build();
TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setDeviceActivity(deviceActivityProto).build();
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(),
TransportProtos.ToCoreMsg.newBuilder().setToDeviceActorMsg(msg).build()), null);
} }
} }
if (entityData.hasAttributeDeleteMsg()) { if (entityData.hasAttributeDeleteMsg()) {

View File

@ -454,6 +454,10 @@ message GetOtaPackageResponseMsg {
string fileName = 8; string fileName = 8;
} }
message DeviceActivityProto {
int64 lastActivityTime = 1;
}
//Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.
message SubscriptionInfoProto { message SubscriptionInfoProto {
int64 lastActivityTime = 1; int64 lastActivityTime = 1;
@ -483,6 +487,7 @@ message TransportToDeviceActorMsg {
ToDeviceRpcResponseStatusMsg rpcResponseStatusMsg = 10; ToDeviceRpcResponseStatusMsg rpcResponseStatusMsg = 10;
SendPendingRPCMsg sendPendingRPC = 11; SendPendingRPCMsg sendPendingRPC = 11;
UplinkNotificationMsg uplinkNotificationMsg = 12; UplinkNotificationMsg uplinkNotificationMsg = 12;
DeviceActivityProto deviceActivity = 13;
} }
message TransportToRuleEngineMsg { message TransportToRuleEngineMsg {