Code review changes: avoid creating device actor if not needed

This commit is contained in:
Volodymyr Babak 2022-06-22 19:42:00 +03:00
parent 5a51028577
commit 731fb1eae8
5 changed files with 32 additions and 17 deletions

View File

@ -418,9 +418,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
if (msg.hasUplinkNotificationMsg()) { if (msg.hasUplinkNotificationMsg()) {
processUplinkNotificationMsg(context, sessionInfo, msg.getUplinkNotificationMsg()); processUplinkNotificationMsg(context, sessionInfo, msg.getUplinkNotificationMsg());
} }
if (msg.hasDeviceActivity()) {
handleDeviceActivity(context, msg.getDeviceActivity());
}
callback.onSuccess(); callback.onSuccess();
} }
@ -732,10 +729,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
} }
} }
private void handleDeviceActivity(TbActorCtx context, TransportProtos.DeviceActivityProto deviceActivityProto) {
systemContext.getDeviceStateService().onDeviceActivity(tenantId, deviceId, deviceActivityProto.getLastActivityTime());
}
void processCredentialsUpdate(TbActorMsg msg) { void processCredentialsUpdate(TbActorMsg msg) {
if (((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials().getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) { if (((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials().getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) {
sessions.forEach((k, v) -> { sessions.forEach((k, v) -> {

View File

@ -109,21 +109,16 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
if (EntityType.DEVICE.equals(entityId.getEntityType())) { if (EntityType.DEVICE.equals(entityId.getEntityType())) {
DeviceId deviceId = new DeviceId(entityId.getId()); DeviceId deviceId = new DeviceId(entityId.getId());
TransportProtos.SessionInfoProto sessionInfo = TransportProtos.SessionInfoProto.newBuilder() TransportProtos.DeviceActivityProto deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) .setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()).build(); .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
TransportProtos.DeviceActivityProto deviceActivityProto = TransportProtos.DeviceActivityProto.newBuilder()
.setLastActivityTime(System.currentTimeMillis()).build(); .setLastActivityTime(System.currentTimeMillis()).build();
TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setDeviceActivity(deviceActivityProto).build();
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(), tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(),
TransportProtos.ToCoreMsg.newBuilder().setToDeviceActorMsg(msg).build()), null); TransportProtos.ToCoreMsg.newBuilder().setDeviceActivityMsg(deviceActivityMsg).build()), null);
} }
} }
if (entityData.hasAttributeDeleteMsg()) { if (entityData.hasAttributeDeleteMsg()) {

View File

@ -28,6 +28,7 @@ import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.MsgType;
@ -236,6 +237,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
} else if (toCoreMsg.hasEdgeNotificationMsg()) { } else if (toCoreMsg.hasEdgeNotificationMsg()) {
log.trace("[{}] Forwarding message to edge service {}", id, toCoreMsg.getEdgeNotificationMsg()); log.trace("[{}] Forwarding message to edge service {}", id, toCoreMsg.getEdgeNotificationMsg());
forwardToEdgeNotificationService(toCoreMsg.getEdgeNotificationMsg(), callback); forwardToEdgeNotificationService(toCoreMsg.getEdgeNotificationMsg(), callback);
} else if (toCoreMsg.hasDeviceActivityMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceActivityMsg());
forwardToStateService(toCoreMsg.getDeviceActivityMsg(), callback);
} else if (!toCoreMsg.getToDeviceActorNotificationMsg().isEmpty()) { } else if (!toCoreMsg.getToDeviceActorNotificationMsg().isEmpty()) {
Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray()); Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray());
if (actorMsg.isPresent()) { if (actorMsg.isPresent()) {
@ -520,6 +524,20 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
stateService.onQueueMsg(deviceStateServiceMsg, callback); stateService.onQueueMsg(deviceStateServiceMsg, callback);
} }
private void forwardToStateService(TransportProtos.DeviceActivityProto deviceActivityMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceActivityMsg);
}
TenantId tenantId = TenantId.fromUUID(new UUID(deviceActivityMsg.getTenantIdMSB(), deviceActivityMsg.getTenantIdLSB()));
DeviceId deviceId = new DeviceId(new UUID(deviceActivityMsg.getDeviceIdMSB(), deviceActivityMsg.getDeviceIdLSB()));
try {
stateService.onDeviceActivity(tenantId, deviceId, deviceActivityMsg.getLastActivityTime());
callback.onSuccess();
} catch (Exception e) {
callback.onFailure(new RuntimeException("Failed update device activity for device [" + deviceId.getId() + "]!", e));
}
}
private void forwardToEdgeNotificationService(EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback) { private void forwardToEdgeNotificationService(EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback) {
if (statsEnabled) { if (statsEnabled) {
stats.log(edgeNotificationMsg); stats.log(edgeNotificationMsg);

View File

@ -112,6 +112,11 @@ public class TbCoreConsumerStats {
edgeNotificationsCounter.increment(); edgeNotificationsCounter.increment();
} }
public void log(TransportProtos.DeviceActivityProto msg) {
totalCounter.increment();
deviceStateCounter.increment();
}
public void log(TransportProtos.SubscriptionMgrMsgProto msg) { public void log(TransportProtos.SubscriptionMgrMsgProto msg) {
totalCounter.increment(); totalCounter.increment();
subscriptionMsgCounter.increment(); subscriptionMsgCounter.increment();

View File

@ -455,7 +455,11 @@ message GetOtaPackageResponseMsg {
} }
message DeviceActivityProto { message DeviceActivityProto {
int64 lastActivityTime = 1; int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 deviceIdMSB = 3;
int64 deviceIdLSB = 4;
int64 lastActivityTime = 5;
} }
//Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.
@ -487,7 +491,6 @@ message TransportToDeviceActorMsg {
ToDeviceRpcResponseStatusMsg rpcResponseStatusMsg = 10; ToDeviceRpcResponseStatusMsg rpcResponseStatusMsg = 10;
SendPendingRPCMsg sendPendingRPC = 11; SendPendingRPCMsg sendPendingRPC = 11;
UplinkNotificationMsg uplinkNotificationMsg = 12; UplinkNotificationMsg uplinkNotificationMsg = 12;
DeviceActivityProto deviceActivity = 13;
} }
message TransportToRuleEngineMsg { message TransportToRuleEngineMsg {
@ -907,6 +910,7 @@ message ToCoreMsg {
SubscriptionMgrMsgProto toSubscriptionMgrMsg = 3; SubscriptionMgrMsgProto toSubscriptionMgrMsg = 3;
bytes toDeviceActorNotificationMsg = 4; bytes toDeviceActorNotificationMsg = 4;
EdgeNotificationMsgProto edgeNotificationMsg = 5; EdgeNotificationMsgProto edgeNotificationMsg = 5;
DeviceActivityProto deviceActivityMsg = 6;
} }
/* High priority messages with low latency are handled by ThingsBoard Core Service separately */ /* High priority messages with low latency are handled by ThingsBoard Core Service separately */