From 59bcc2f600d162d0d5ff064bb3d309690e2ac52f Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 22 Jun 2022 12:51:55 +0300 Subject: [PATCH] Code review changes --- .../edge/rpc/processor/BaseEdgeProcessor.java | 10 ---- .../rpc/processor/TelemetryEdgeProcessor.java | 47 +------------------ 2 files changed, 1 insertion(+), 56 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 46b36a1932..09bef32af9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -21,7 +21,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Lazy; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EdgeUtils; @@ -53,8 +52,6 @@ import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; -import org.thingsboard.server.queue.discovery.PartitionService; -import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.service.edge.rpc.constructor.AdminSettingsMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AlarmMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AssetMsgConstructor; @@ -140,13 +137,6 @@ public abstract class BaseEdgeProcessor { @Autowired protected WidgetTypeService widgetTypeService; - @Autowired - protected PartitionService partitionService; - - @Autowired - @Lazy - protected TbQueueProducerProvider producerProvider; - @Autowired protected DataValidator deviceValidator; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java index cc17970ec8..5858024292 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java @@ -52,8 +52,6 @@ import org.thingsboard.server.common.data.kv.AttributeKey; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; -import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.util.JsonUtils; @@ -63,12 +61,9 @@ import org.thingsboard.server.gen.edge.v1.EntityDataProto; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; -import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.util.TbCoreComponent; import javax.annotation.Nullable; -import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -82,13 +77,6 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { private final Gson gson = new Gson(); - private TbQueueProducer> tbCoreMsgProducer; - - @PostConstruct - public void init() { - tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); - } - public List> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData, UUID edgeSessionId) { log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData); List> result = new ArrayList<>(); @@ -107,9 +95,7 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); } if (EntityType.DEVICE.equals(entityId.getEntityType())) { - Device device = deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())); - // for edge context sessionId is exact edgeSessionId - reportActivity(device, edgeSessionId); + deviceStateService.onDeviceActivity(tenantId, new DeviceId(entityId.getId()), System.currentTimeMillis()); } } if (entityData.hasAttributeDeleteMsg()) { @@ -118,37 +104,6 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { return result; } - private void reportActivity(Device device, UUID sessionId) { - TransportProtos.SessionInfoProto.Builder builder = TransportProtos.SessionInfoProto.newBuilder() - .setSessionIdMSB(sessionId.getMostSignificantBits()) - .setSessionIdLSB(sessionId.getLeastSignificantBits()) - .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits()) - .setDeviceIdMSB(device.getId().getId().getMostSignificantBits()) - .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits()) - .setDeviceName(device.getName()) - .setDeviceType(device.getType()) - .setDeviceProfileIdMSB(device.getDeviceProfileId().getId().getMostSignificantBits()) - .setDeviceProfileIdLSB(device.getDeviceProfileId().getId().getLeastSignificantBits()); - - if (device.getCustomerId() != null && !device.getCustomerId().isNullUid()) { - builder.setCustomerIdMSB(device.getCustomerId().getId().getMostSignificantBits()); - builder.setCustomerIdLSB(device.getCustomerId().getId().getLeastSignificantBits()); - } - reportActivity(device.getTenantId(), device.getId(), device.getUuidId(), builder.build()); - } - - private void reportActivity(TenantId tenantId, DeviceId deviceId, UUID routingKey, TransportProtos.SessionInfoProto sessionInfo) { - TransportProtos.SubscriptionInfoProto subscriptionInfoProto = TransportProtos.SubscriptionInfoProto.newBuilder() - .setAttributeSubscription(false).setRpcSubscription(false) - .setLastActivityTime(System.currentTimeMillis()).build(); - TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setSubscriptionInfo(subscriptionInfoProto).build(); - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); - tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, - TransportProtos.ToCoreMsg.newBuilder().setToDeviceActorMsg(msg).build()), null); - } - private TbMsgMetaData constructBaseMsgMetadata(TenantId tenantId, EntityId entityId) { TbMsgMetaData metaData = new TbMsgMetaData(); switch (entityId.getEntityType()) {