From 6adf0f256e6cf183d36d203c66f96628f9ed1c4d Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 10 Jun 2022 14:48:20 +0300 Subject: [PATCH 1/8] Report device activity from edge service - set active flag to true on cloud --- .../service/edge/rpc/EdgeGrpcSession.java | 2 +- .../edge/rpc/processor/BaseEdgeProcessor.java | 10 ++++ .../rpc/processor/TelemetryEdgeProcessor.java | 54 +++++++++++++++++-- .../thingsboard/server/edge/BaseEdgeTest.java | 27 +++++++--- 4 files changed, 81 insertions(+), 12 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 6c991444b2..bb67b760b5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -571,7 +571,7 @@ public final class EdgeGrpcSession implements Closeable { try { if (uplinkMsg.getEntityDataCount() > 0) { for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) { - result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), edge.getCustomerId(), entityData)); + result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), edge.getCustomerId(), entityData, sessionId)); } } if (uplinkMsg.getDeviceUpdateMsgCount() > 0) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 09bef32af9..46b36a1932 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EdgeUtils; @@ -52,6 +53,8 @@ import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.service.edge.rpc.constructor.AdminSettingsMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AlarmMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AssetMsgConstructor; @@ -137,6 +140,13 @@ public abstract class BaseEdgeProcessor { @Autowired protected WidgetTypeService widgetTypeService; + @Autowired + protected PartitionService partitionService; + + @Autowired + @Lazy + protected TbQueueProducerProvider producerProvider; + @Autowired protected DataValidator deviceValidator; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java index 769d988507..cc17970ec8 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java @@ -52,6 +52,8 @@ import org.thingsboard.server.common.data.kv.AttributeKey; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.util.JsonUtils; @@ -61,9 +63,12 @@ import org.thingsboard.server.gen.edge.v1.EntityDataProto; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.util.TbCoreComponent; import javax.annotation.Nullable; +import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -77,14 +82,19 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { private final Gson gson = new Gson(); - public List> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) { + private TbQueueProducer> tbCoreMsgProducer; + + @PostConstruct + public void init() { + tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); + } + + public List> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData, UUID edgeSessionId) { log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData); List> result = new ArrayList<>(); EntityId entityId = constructEntityId(entityData); if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) { - // @voba - in terms of performance we should not fetch device from DB by id - // TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId); - TbMsgMetaData metaData = new TbMsgMetaData(); + TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId); metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE); if (entityData.hasPostAttributesMsg()) { result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData)); @@ -96,6 +106,11 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { if (entityData.hasPostTelemetryMsg()) { result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); } + if (EntityType.DEVICE.equals(entityId.getEntityType())) { + Device device = deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())); + // for edge context sessionId is exact edgeSessionId + reportActivity(device, edgeSessionId); + } } if (entityData.hasAttributeDeleteMsg()) { result.add(processAttributeDeleteMsg(tenantId, entityId, entityData.getAttributeDeleteMsg(), entityData.getEntityType())); @@ -103,6 +118,37 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { return result; } + private void reportActivity(Device device, UUID sessionId) { + TransportProtos.SessionInfoProto.Builder builder = TransportProtos.SessionInfoProto.newBuilder() + .setSessionIdMSB(sessionId.getMostSignificantBits()) + .setSessionIdLSB(sessionId.getLeastSignificantBits()) + .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits()) + .setDeviceIdMSB(device.getId().getId().getMostSignificantBits()) + .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits()) + .setDeviceName(device.getName()) + .setDeviceType(device.getType()) + .setDeviceProfileIdMSB(device.getDeviceProfileId().getId().getMostSignificantBits()) + .setDeviceProfileIdLSB(device.getDeviceProfileId().getId().getLeastSignificantBits()); + + if (device.getCustomerId() != null && !device.getCustomerId().isNullUid()) { + builder.setCustomerIdMSB(device.getCustomerId().getId().getMostSignificantBits()); + builder.setCustomerIdLSB(device.getCustomerId().getId().getLeastSignificantBits()); + } + reportActivity(device.getTenantId(), device.getId(), device.getUuidId(), builder.build()); + } + + private void reportActivity(TenantId tenantId, DeviceId deviceId, UUID routingKey, TransportProtos.SessionInfoProto sessionInfo) { + TransportProtos.SubscriptionInfoProto subscriptionInfoProto = TransportProtos.SubscriptionInfoProto.newBuilder() + .setAttributeSubscription(false).setRpcSubscription(false) + .setLastActivityTime(System.currentTimeMillis()).build(); + TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setSubscriptionInfo(subscriptionInfoProto).build(); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); + tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, + TransportProtos.ToCoreMsg.newBuilder().setToDeviceActorMsg(msg).build()), null); + } + private TbMsgMetaData constructBaseMsgMetadata(TenantId tenantId, EntityId entityId) { TbMsgMetaData metaData = new TbMsgMetaData(); switch (entityId.getEntityType()) { diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java index 61bb53b060..a2f72a86d2 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java @@ -1339,17 +1339,17 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { String timeseriesKey = "key"; String timeseriesValue = "25"; data.addProperty(timeseriesKey, timeseriesValue); - UplinkMsg.Builder uplinkMsgBuilder1 = UplinkMsg.newBuilder(); + UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); EntityDataProto.Builder entityDataBuilder = EntityDataProto.newBuilder(); entityDataBuilder.setPostTelemetryMsg(JsonConverter.convertToTelemetryProto(data, System.currentTimeMillis())); entityDataBuilder.setEntityType(device.getId().getEntityType().name()); entityDataBuilder.setEntityIdMSB(device.getUuidId().getMostSignificantBits()); entityDataBuilder.setEntityIdLSB(device.getUuidId().getLeastSignificantBits()); testAutoGeneratedCodeByProtobuf(entityDataBuilder); - uplinkMsgBuilder1.addEntityData(entityDataBuilder.build()); + uplinkMsgBuilder.addEntityData(entityDataBuilder.build()); - testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder1); - edgeImitator.sendUplinkMsg(uplinkMsgBuilder1.build()); + testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); JsonObject attributesData = new JsonObject(); String attributesKey = "test_attr"; @@ -1381,9 +1381,22 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { String attributeValuesUrl = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/attributes/" + DataConstants.SERVER_SCOPE; List> attributes = doGetAsyncTyped(attributeValuesUrl, new TypeReference<>() {}); - Assert.assertEquals(2, attributes.size()); - var result = attributes.stream().filter(kv -> kv.get("key").equals(attributesKey)).filter(kv -> kv.get("value").equals(attributesValue)).findFirst(); - Assert.assertTrue(result.isPresent()); + + Assert.assertEquals(3, attributes.size()); + + Optional> activeAttributeOpt = getAttributeByKey("active", attributes); + Assert.assertTrue(activeAttributeOpt.isPresent()); + Map activeAttribute = activeAttributeOpt.get(); + Assert.assertEquals("true", activeAttribute.get("value")); + + Optional> customAttributeOpt = getAttributeByKey(attributesKey, attributes); + Assert.assertTrue(customAttributeOpt.isPresent()); + Map customAttribute = customAttributeOpt.get(); + Assert.assertEquals(attributesValue, customAttribute.get("value")); + } + + private Optional> getAttributeByKey(String key, List> attributes) { + return attributes.stream().filter(kv -> kv.get("key").equals(key)).findFirst(); } private Map>> loadDeviceTimeseries(Device device, String timeseriesKey) throws Exception { From 131cff88e47a7abc0833b04756c0a73a5dd03680 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 10 Jun 2022 14:52:47 +0300 Subject: [PATCH 2/8] BaseEdgeTest - Added DELETE attribute after test complete --- .../src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java index a2f72a86d2..924bec8282 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java @@ -1393,6 +1393,8 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { Assert.assertTrue(customAttributeOpt.isPresent()); Map customAttribute = customAttributeOpt.get(); Assert.assertEquals(attributesValue, customAttribute.get("value")); + + doDelete("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/SERVER_SCOPE?keys=" + attributesKey, String.class); } private Optional> getAttributeByKey(String key, List> attributes) { From 59bcc2f600d162d0d5ff064bb3d309690e2ac52f Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 22 Jun 2022 12:51:55 +0300 Subject: [PATCH 3/8] Code review changes --- .../edge/rpc/processor/BaseEdgeProcessor.java | 10 ---- .../rpc/processor/TelemetryEdgeProcessor.java | 47 +------------------ 2 files changed, 1 insertion(+), 56 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 46b36a1932..09bef32af9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -21,7 +21,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Lazy; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EdgeUtils; @@ -53,8 +52,6 @@ import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; -import org.thingsboard.server.queue.discovery.PartitionService; -import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.service.edge.rpc.constructor.AdminSettingsMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AlarmMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AssetMsgConstructor; @@ -140,13 +137,6 @@ public abstract class BaseEdgeProcessor { @Autowired protected WidgetTypeService widgetTypeService; - @Autowired - protected PartitionService partitionService; - - @Autowired - @Lazy - protected TbQueueProducerProvider producerProvider; - @Autowired protected DataValidator deviceValidator; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java index cc17970ec8..5858024292 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java @@ -52,8 +52,6 @@ import org.thingsboard.server.common.data.kv.AttributeKey; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; -import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.util.JsonUtils; @@ -63,12 +61,9 @@ import org.thingsboard.server.gen.edge.v1.EntityDataProto; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; -import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.util.TbCoreComponent; import javax.annotation.Nullable; -import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -82,13 +77,6 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { private final Gson gson = new Gson(); - private TbQueueProducer> tbCoreMsgProducer; - - @PostConstruct - public void init() { - tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); - } - public List> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData, UUID edgeSessionId) { log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData); List> result = new ArrayList<>(); @@ -107,9 +95,7 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); } if (EntityType.DEVICE.equals(entityId.getEntityType())) { - Device device = deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())); - // for edge context sessionId is exact edgeSessionId - reportActivity(device, edgeSessionId); + deviceStateService.onDeviceActivity(tenantId, new DeviceId(entityId.getId()), System.currentTimeMillis()); } } if (entityData.hasAttributeDeleteMsg()) { @@ -118,37 +104,6 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { return result; } - private void reportActivity(Device device, UUID sessionId) { - TransportProtos.SessionInfoProto.Builder builder = TransportProtos.SessionInfoProto.newBuilder() - .setSessionIdMSB(sessionId.getMostSignificantBits()) - .setSessionIdLSB(sessionId.getLeastSignificantBits()) - .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits()) - .setDeviceIdMSB(device.getId().getId().getMostSignificantBits()) - .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits()) - .setDeviceName(device.getName()) - .setDeviceType(device.getType()) - .setDeviceProfileIdMSB(device.getDeviceProfileId().getId().getMostSignificantBits()) - .setDeviceProfileIdLSB(device.getDeviceProfileId().getId().getLeastSignificantBits()); - - if (device.getCustomerId() != null && !device.getCustomerId().isNullUid()) { - builder.setCustomerIdMSB(device.getCustomerId().getId().getMostSignificantBits()); - builder.setCustomerIdLSB(device.getCustomerId().getId().getLeastSignificantBits()); - } - reportActivity(device.getTenantId(), device.getId(), device.getUuidId(), builder.build()); - } - - private void reportActivity(TenantId tenantId, DeviceId deviceId, UUID routingKey, TransportProtos.SessionInfoProto sessionInfo) { - TransportProtos.SubscriptionInfoProto subscriptionInfoProto = TransportProtos.SubscriptionInfoProto.newBuilder() - .setAttributeSubscription(false).setRpcSubscription(false) - .setLastActivityTime(System.currentTimeMillis()).build(); - TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setSubscriptionInfo(subscriptionInfoProto).build(); - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); - tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, - TransportProtos.ToCoreMsg.newBuilder().setToDeviceActorMsg(msg).build()), null); - } - private TbMsgMetaData constructBaseMsgMetadata(TenantId tenantId, EntityId entityId) { TbMsgMetaData metaData = new TbMsgMetaData(); switch (entityId.getEntityType()) { From 87834c2ccd33c4eefe1967412a48991ac1900ecf Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 22 Jun 2022 12:52:49 +0300 Subject: [PATCH 4/8] Remove unused paramater --- .../thingsboard/server/service/edge/rpc/EdgeGrpcSession.java | 2 +- .../service/edge/rpc/processor/TelemetryEdgeProcessor.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index cb00624f62..1d30d05fea 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -573,7 +573,7 @@ public final class EdgeGrpcSession implements Closeable { try { if (uplinkMsg.getEntityDataCount() > 0) { for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) { - result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), edge.getCustomerId(), entityData, sessionId)); + result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), edge.getCustomerId(), entityData)); } } if (uplinkMsg.getDeviceUpdateMsgCount() > 0) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java index 5858024292..5c807f9434 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java @@ -77,7 +77,7 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { private final Gson gson = new Gson(); - public List> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData, UUID edgeSessionId) { + public List> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) { log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData); List> result = new ArrayList<>(); EntityId entityId = constructEntityId(entityData); From e551c53b49eeb0248c2cf489728d4f2d53c161b4 Mon Sep 17 00:00:00 2001 From: Viacheslav Klimov Date: Wed, 22 Jun 2022 14:38:02 +0300 Subject: [PATCH 5/8] Swagger docs for EntitiesVersionControlController --- .../controller/ControllerConstants.java | 1 + .../EntitiesVersionControlController.java | 316 +++++++++++++----- 2 files changed, 241 insertions(+), 76 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java index b83d029d95..3e4e2e6e87 100644 --- a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java +++ b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java @@ -137,6 +137,7 @@ public class ControllerConstants { protected static final String EDGE_ASSIGN_RECEIVE_STEP_DESCRIPTION = "(Edge will receive this instantly, if it's currently connected, or once it's going to be connected to platform). "; protected static final String ENTITY_VERSION_TEXT_SEARCH_DESCRIPTION = "The case insensitive 'substring' filter based on the entity version name."; + protected static final String VERSION_ID_PARAM_DESCRIPTION = "Version id, for example fd82625bdd7d6131cf8027b44ee967012ecaf990. Represents commit hash."; protected static final String MARKDOWN_CODE_BLOCK_START = "```json\n"; protected static final String MARKDOWN_CODE_BLOCK_END = "\n```"; diff --git a/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java b/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java index 1bcc304548..11d64b0be4 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java @@ -57,14 +57,19 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import static org.thingsboard.server.controller.ControllerConstants.MARKDOWN_CODE_BLOCK_END; +import static org.thingsboard.server.controller.ControllerConstants.MARKDOWN_CODE_BLOCK_START; import static org.thingsboard.server.controller.ControllerConstants.NEW_LINE; +import static org.thingsboard.server.controller.ControllerConstants.PAGE_DATA_PARAMETERS; import static org.thingsboard.server.controller.ControllerConstants.PAGE_SIZE_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.PAGE_NUMBER_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.ENTITY_VERSION_TEXT_SEARCH_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.SORT_PROPERTY_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.SORT_ORDER_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.SORT_ORDER_ALLOWABLE_VALUES; +import static org.thingsboard.server.controller.ControllerConstants.TENANT_AUTHORITY_PARAGRAPH; import static org.thingsboard.server.controller.ControllerConstants.VC_REQUEST_ID_PARAM_DESCRIPTION; +import static org.thingsboard.server.controller.ControllerConstants.VERSION_ID_PARAM_DESCRIPTION; @RestController @TbCoreComponent @@ -76,9 +81,20 @@ public class EntitiesVersionControlController extends BaseController { private final EntitiesVersionControlService versionControlService; - @ApiOperation(value = "", notes = "" + - "SINGLE_ENTITY:" + NEW_LINE + - "```\n{\n" + + @ApiOperation(value = "Save entities version (saveEntitiesVersion)", notes = "" + + "Creates a new version of entities (or a single entity) by request.\n" + + "Supported entity types: CUSTOMER, ASSET, RULE_CHAIN, DASHBOARD, DEVICE_PROFILE, DEVICE, ENTITY_VIEW, WIDGETS_BUNDLE." + NEW_LINE + + "There are two available types of request: `SINGLE_ENTITY` and `COMPLEX`. " + + "Each of them contains version name (`versionName`) and name of a branch (`branch`) to create version (commit) in. " + + "If specified branch does not exists in a remote repo, then new empty branch will be created. " + + "Request of the `SINGLE_ENTITY` type has id of an entity (`entityId`) and additional configuration (`config`) " + + "which has following options: \n" + + "- `saveRelations` - whether to add inbound and outbound relations of type COMMON to created entity version;\n" + + "- `saveAttributes` - to save attributes of server scope (and also shared scope for devices);\n" + + "- `saveCredentials` - when saving a version of a device, to add its credentials to the version." + NEW_LINE + + "An example of a `SINGLE_ENTITY` version create request:\n" + + MARKDOWN_CODE_BLOCK_START + + "{\n" + " \"type\": \"SINGLE_ENTITY\",\n" + "\n" + " \"versionName\": \"Version 1.0\",\n" + @@ -89,11 +105,25 @@ public class EntitiesVersionControlController extends BaseController { " \"id\": \"b79448e0-d4f4-11ec-847b-0f432358ab48\"\n" + " },\n" + " \"config\": {\n" + - " \"saveRelations\": true\n" + + " \"saveRelations\": true,\n" + + " \"saveAttributes\": true,\n" + + " \"saveCredentials\": false\n" + " }\n" + - "}\n```" + NEW_LINE + - "COMPLEX:" + NEW_LINE + - "```\n{\n" + + "}" + + MARKDOWN_CODE_BLOCK_END + NEW_LINE + + "Second request type (`COMPLEX`), additionally to `branch` and `versionName`, contains following properties:\n" + + "- `entityTypes` - a structure with entity types to export and configuration for each entity type; " + + " this configuration has all the options available for `SINGLE_ENTITY` and additionally has these ones: \n" + + " - `allEntities` and `entityIds` - if you want to save the version of all entities of the entity type " + + " then set `allEntities` param to true, otherwise set it to false and specify the list of specific entities (`entityIds`);\n" + + " - `syncStrategy` - synchronization strategy to use for this entity type: when set to `OVERWRITE` " + + " then the list of remote entities of this type will be overwritten by newly added entities. If set to " + + " `MERGE` - existing remote entities of this entity type will not be removed, new entities will just " + + " be added on top (or existing remote entities will be updated).\n" + + "- `syncStrategy` - default synchronization strategy to use when it is not specified for an entity type." + NEW_LINE + + "Example for this type of request:\n" + + MARKDOWN_CODE_BLOCK_START + + "{\n" + " \"type\": \"COMPLEX\",\n" + "\n" + " \"versionName\": \"Devices and profiles: release 2\",\n" + @@ -104,7 +134,9 @@ public class EntitiesVersionControlController extends BaseController { " \"DEVICE\": {\n" + " \"syncStrategy\": null,\n" + " \"allEntities\": true,\n" + - " \"saveRelations\": true\n" + + " \"saveRelations\": true,\n" + + " \"saveAttributes\": true,\n" + + " \"saveCredentials\": true\n" + " },\n" + " \"DEVICE_PROFILE\": {\n" + " \"syncStrategy\": \"MERGE\",\n" + @@ -115,7 +147,11 @@ public class EntitiesVersionControlController extends BaseController { " \"saveRelations\": true\n" + " }\n" + " }\n" + - "}\n```") + "}" + + MARKDOWN_CODE_BLOCK_END + NEW_LINE + + "Response wil contain generated request UUID, that can be then used to retrieve " + + "status of operation via `getVersionCreateRequestStatus`.\n" + + TENANT_AUTHORITY_PARAGRAPH) @PostMapping("/version") public DeferredResult saveEntitiesVersion(@RequestBody VersionCreateRequest request) throws Exception { SecurityUser user = getCurrentUser(); @@ -123,7 +159,32 @@ public class EntitiesVersionControlController extends BaseController { return wrapFuture(versionControlService.saveEntitiesVersion(user, request)); } - @ApiOperation(value = "", notes = "") + @ApiOperation(value = "Get version create request status (getVersionCreateRequestStatus)", notes = "" + + "Returns the status of previously made version create request. " + NEW_LINE + + "This status contains following properties:\n" + + "- `done` - whether request processing is finished;\n" + + "- `version` - created version info: timestamp, version id (commit hash), commit name and commit author;\n" + + "- `added` - count of items that were created in the remote repo;\n" + + "- `modified` - modified items count;\n" + + "- `removed` - removed items count;\n" + + "- `error` - error message, if an error occurred while handling the request." + NEW_LINE + + "An example of successful status:\n" + + MARKDOWN_CODE_BLOCK_START + + "{\n" + + " \"done\": true,\n" + + " \"added\": 10,\n" + + " \"modified\": 2,\n" + + " \"removed\": 5,\n" + + " \"version\": {\n" + + " \"timestamp\": 1655198528000,\n" + + " \"id\":\"8a834dd389ed80e0759ba8ee338b3f1fd160a114\",\n" + + " \"name\": \"My devices v2.0\",\n" + + " \"author\": \"John Doe\"\n" + + " },\n" + + " \"error\": null\n" + + "}" + + MARKDOWN_CODE_BLOCK_END + + TENANT_AUTHORITY_PARAGRAPH) @GetMapping(value = "/version/{requestId}/status") public VersionCreationResult getVersionCreateRequestStatus(@ApiParam(value = VC_REQUEST_ID_PARAM_DESCRIPTION, required = true) @PathVariable UUID requestId) throws Exception { @@ -131,13 +192,42 @@ public class EntitiesVersionControlController extends BaseController { return versionControlService.getVersionCreateStatus(getCurrentUser(), requestId); } - @ApiOperation(value = "", notes = "" + - "```\n[\n" + - " {\n" + - " \"id\": \"c30c8bcaed3f0813649f0dee51a89d04d0a12b28\",\n" + - " \"name\": \"Device profile 1 version 1.0\"\n" + - " }\n" + - "]\n```") + @ApiOperation(value = "List entity versions (listEntityVersions)", notes = "" + + "Returns list of versions for a specific entity in a concrete branch. \n" + + "You need to specify external id of an entity to list versions for. This is `externalId` property of an entity, " + + "or otherwise if not set - simply id of this entity. \n" + + "If specified branch does not exist - empty page data will be returned. " + NEW_LINE + + "Each version info item has timestamp, id, name and author. Version id can then be used to restore the version. " + + PAGE_DATA_PARAMETERS + NEW_LINE + + "Response example: \n" + + MARKDOWN_CODE_BLOCK_START + + "{\n" + + " \"data\": [\n" + + " {\n" + + " \"timestamp\": 1655198593000,\n" + + " \"id\": \"fd82625bdd7d6131cf8027b44ee967012ecaf990\",\n" + + " \"name\": \"Devices and assets - v2.0\",\n" + + " \"author\": \"John Doe \"\n" + + " },\n" + + " {\n" + + " \"timestamp\": 1655198528000,\n" + + " \"id\": \"682adcffa9c8a2f863af6f00c4850323acbd4219\",\n" + + " \"name\": \"Update my device\",\n" + + " \"author\": \"John Doe \"\n" + + " },\n" + + " {\n" + + " \"timestamp\": 1655198280000,\n" + + " \"id\": \"d2a6087c2b30e18cc55e7cdda345a8d0dfb959a4\",\n" + + " \"name\": \"Devices and assets - v1.0\",\n" + + " \"author\": \"John Doe \"\n" + + " }\n" + + " ],\n" + + " \"totalPages\": 1,\n" + + " \"totalElements\": 3,\n" + + " \"hasNext\": false\n" + + "}" + + MARKDOWN_CODE_BLOCK_END + + TENANT_AUTHORITY_PARAGRAPH) @GetMapping(value = "/version/{branch}/{entityType}/{externalEntityUuid}", params = {"pageSize", "page"}) public DeferredResult> listEntityVersions(@PathVariable String branch, @PathVariable EntityType entityType, @@ -158,13 +248,12 @@ public class EntitiesVersionControlController extends BaseController { return wrapFuture(versionControlService.listEntityVersions(getTenantId(), branch, externalEntityId, pageLink)); } - @ApiOperation(value = "", notes = "" + - "```\n[\n" + - " {\n" + - " \"id\": \"c30c8bcaed3f0813649f0dee51a89d04d0a12b28\",\n" + - " \"name\": \"Device profiles from dev\"\n" + - " }\n" + - "]\n```") + @ApiOperation(value = "List entity type versions (listEntityTypeVersions)", notes = "" + + "Returns list of versions of an entity type in a branch. This is a collected list of versions that were created " + + "for entities of this type in a remote branch. \n" + + "If specified branch does not exist - empty page data will be returned. " + + "The response structure is the same as for `listEntityVersions` API method." + + TENANT_AUTHORITY_PARAGRAPH) @GetMapping(value = "/version/{branch}/{entityType}", params = {"pageSize", "page"}) public DeferredResult> listEntityTypeVersions(@PathVariable String branch, @PathVariable EntityType entityType, @@ -183,21 +272,11 @@ public class EntitiesVersionControlController extends BaseController { return wrapFuture(versionControlService.listEntityTypeVersions(getTenantId(), branch, entityType, pageLink)); } - @ApiOperation(value = "", notes = "" + - "```\n[\n" + - " {\n" + - " \"id\": \"ba9baaca1742b730e7331f31a6a51da5fc7da7f7\",\n" + - " \"name\": \"Device 1 removed\"\n" + - " },\n" + - " {\n" + - " \"id\": \"b3c28d722d328324c7c15b0b30047b0c40011cf7\",\n" + - " \"name\": \"Device profiles added\"\n" + - " },\n" + - " {\n" + - " \"id\": \"c30c8bcaed3f0813649f0dee51a89d04d0a12b28\",\n" + - " \"name\": \"Devices added\"\n" + - " }\n" + - "]\n```") + @ApiOperation(value = "List all versions (listVersions)", notes = "" + + "Lists all available versions in a branch for all entity types. \n" + + "If specified branch does not exist - empty page data will be returned. " + + "The response format is the same as for `listEntityVersions` API method." + + TENANT_AUTHORITY_PARAGRAPH) @GetMapping(value = "/version/{branch}", params = {"pageSize", "page"}) public DeferredResult> listVersions(@PathVariable String branch, @ApiParam(value = PAGE_SIZE_DESCRIPTION, required = true) @@ -216,23 +295,42 @@ public class EntitiesVersionControlController extends BaseController { } + @ApiOperation(value = "List entities at version (listEntitiesAtVersion)", notes = "" + + "Returns a list of remote entities of a specific entity type that are available at a concrete version. \n" + + "Each entity item in the result has `externalId` property. " + + "Entities order will be the same as in the repository." + + TENANT_AUTHORITY_PARAGRAPH) @GetMapping("/entity/{branch}/{entityType}/{versionId}") public DeferredResult> listEntitiesAtVersion(@PathVariable String branch, @PathVariable EntityType entityType, + @ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true) @PathVariable String versionId) throws Exception { accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ); return wrapFuture(versionControlService.listEntitiesAtVersion(getTenantId(), branch, versionId, entityType)); } + @ApiOperation(value = "List all entities at version (listAllEntitiesAtVersion)", notes = "" + + "Returns a list of all remote entities available in a specific version. " + + "Response type is the same as for listAllEntitiesAtVersion API method. \n" + + "Returned entities order will be the same as in the repository." + + TENANT_AUTHORITY_PARAGRAPH) @GetMapping("/entity/{branch}/{versionId}") public DeferredResult> listAllEntitiesAtVersion(@PathVariable String branch, + @ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true) @PathVariable String versionId) throws Exception { accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ); return wrapFuture(versionControlService.listAllEntitiesAtVersion(getTenantId(), branch, versionId)); } + @ApiOperation(value = "Get entity data info (getEntityDataInfo)", notes = "" + + "Retrieves short info about the remote entity by external id at a concrete version. \n" + + "Returned entity data info contains following properties: " + + "`hasRelations` (whether stored entity data contains relations), `hasAttributes` (contains attributes) and " + + "`hasCredentials` (whether stored device data has credentials)." + + TENANT_AUTHORITY_PARAGRAPH) @GetMapping("/info/{versionId}/{entityType}/{externalEntityUuid}") - public DeferredResult getEntityDataInfo(@PathVariable String versionId, + public DeferredResult getEntityDataInfo(@ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true) + @PathVariable String versionId, @PathVariable EntityType entityType, @PathVariable UUID externalEntityUuid) throws Exception { accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ); @@ -240,19 +338,33 @@ public class EntitiesVersionControlController extends BaseController { return wrapFuture(versionControlService.getEntityDataInfo(getCurrentUser(), entityId, versionId)); } + @ApiOperation(value = "Compare entity data to version (compareEntityDataToVersion)", notes = "" + + "Returns an object with current entity data and the one at a specific version. " + + "Entity data structure is the same as stored in a repository. " + + TENANT_AUTHORITY_PARAGRAPH) @GetMapping("/diff/{branch}/{entityType}/{internalEntityUuid}") public DeferredResult compareEntityDataToVersion(@PathVariable String branch, @PathVariable EntityType entityType, @PathVariable UUID internalEntityUuid, + @ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true) @RequestParam String versionId) throws Exception { accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ); EntityId entityId = EntityIdFactory.getByTypeAndUuid(entityType, internalEntityUuid); return wrapFuture(versionControlService.compareEntityDataToVersion(getCurrentUser(), branch, entityId, versionId)); } - @ApiOperation(value = "", notes = "" + - "SINGLE_ENTITY:" + NEW_LINE + - "```\n{\n" + + @ApiOperation(value = "Load entities version (loadEntitiesVersion)", notes = "" + + "Loads specific version of remote entities (or single entity) by request. " + + "Supported entity types: CUSTOMER, ASSET, RULE_CHAIN, DASHBOARD, DEVICE_PROFILE, DEVICE, ENTITY_VIEW, WIDGETS_BUNDLE." + NEW_LINE + + "There are multiple types of request. Each of them requires branch name (`branch`) and version id (`versionId`). " + + "Request of type `SINGLE_ENTITY` is needed to restore a concrete version of a specific entity. It contains " + + "id of a remote entity (`externalEntityId`) and additional configuration (`config`):\n" + + "- `loadRelations` - to update relations list (in case `saveRelations` option was enabled during version creation);\n" + + "- `loadAttributes` - to load entity attributes (if `saveAttributes` config option was enabled);\n" + + "- `loadCredentials` - to update device credentials (if `saveCredentials` option was enabled during version creation)." + NEW_LINE + + "An example of such request:\n" + + MARKDOWN_CODE_BLOCK_START + + "{\n" + " \"type\": \"SINGLE_ENTITY\",\n" + " \n" + " \"branch\": \"dev\",\n" + @@ -264,11 +376,23 @@ public class EntitiesVersionControlController extends BaseController { " },\n" + " \"config\": {\n" + " \"loadRelations\": false,\n" + - " \"findExistingEntityByName\": false\n" + + " \"loadAttributes\": true,\n" + + " \"loadCredentials\": true\n" + " }\n" + - "}\n```" + NEW_LINE + - "ENTITY_TYPE:" + NEW_LINE + - "```\n{\n" + + "}" + + MARKDOWN_CODE_BLOCK_END + NEW_LINE + + "Another request type (`ENTITY_TYPE`) is needed to load specific version of the whole entity types. " + + "It contains a structure with entity types to load and configs for each entity type (`entityTypes`). " + + "For each specified entity type, the method will load all remote entities of this type that are present " + + "at the version. A config for each entity type contains the same options as in `SINGLE_ENTITY` request type, and " + + "additionally contains following options:\n" + + "- `removeOtherEntities` - to remove local entities that are not present on the remote - basically to " + + " overwrite local entity type with the remote one;\n" + + "- `findExistingEntityByName` - when you are loading some remote entities that are not yet present at this tenant, " + + " try to find existing entity by name and update it rather than create new." + NEW_LINE + + "Here is an example of the request to completely restore version of the whole device entity type:\n" + + MARKDOWN_CODE_BLOCK_START + + "{\n" + " \"type\": \"ENTITY_TYPE\",\n" + "\n" + " \"branch\": \"dev\",\n" + @@ -276,12 +400,18 @@ public class EntitiesVersionControlController extends BaseController { "\n" + " \"entityTypes\": {\n" + " \"DEVICE\": {\n" + - " \"loadRelations\": false,\n" + + " \"removeOtherEntities\": true,\n" + " \"findExistingEntityByName\": false,\n" + - " \"removeOtherEntities\": true\n" + + " \"loadRelations\": true,\n" + + " \"loadAttributes\": true,\n" + + " \"loadCredentials\": true\n" + " }\n" + " }\n" + - "}\n```") + "}" + + MARKDOWN_CODE_BLOCK_END + NEW_LINE + + "The response will contain generated request UUID that is to be used to check the status of operation " + + "via `getVersionLoadRequestStatus`." + + TENANT_AUTHORITY_PARAGRAPH) @PostMapping("/entity") public UUID loadEntitiesVersion(@RequestBody VersionLoadRequest request) throws Exception { SecurityUser user = getCurrentUser(); @@ -289,17 +419,54 @@ public class EntitiesVersionControlController extends BaseController { return versionControlService.loadEntitiesVersion(user, request); } - @ApiOperation(value = "", notes = "") + @ApiOperation(value = "Get version load request status (getVersionLoadRequestStatus)", notes = "" + + "Returns the status of previously made version load request. " + + "The structure contains following parameters:\n" + + "- `done` - if the request was successfully processed;\n" + + "- `result` - a list of load results for each entity type:\n" + + " - `created` - created entities count;\n" + + " - `updated` - updated entities count;\n" + + " - `deleted` - removed entities count.\n" + + "- `error` - if an error occurred during processing, error info:\n" + + " - `type` - error type;\n" + + " - `source` - an external id of remote entity;\n" + + " - `target` - if failed to find referenced entity by external id - this external id;\n" + + " - `message` - error message." + NEW_LINE + + "An example of successfully processed request status:\n" + + MARKDOWN_CODE_BLOCK_START + + "{\n" + + " \"done\": true,\n" + + " \"result\": [\n" + + " {\n" + + " \"entityType\": \"DEVICE\",\n" + + " \"created\": 10,\n" + + " \"updated\": 5,\n" + + " \"deleted\": 5\n" + + " },\n" + + " {\n" + + " \"entityType\": \"ASSET\",\n" + + " \"created\": 4,\n" + + " \"updated\": 0,\n" + + " \"deleted\": 8\n" + + " }\n" + + " ]\n" + + "}" + + MARKDOWN_CODE_BLOCK_END + + TENANT_AUTHORITY_PARAGRAPH + ) @GetMapping(value = "/entity/{requestId}/status") public VersionLoadResult getVersionLoadRequestStatus(@ApiParam(value = VC_REQUEST_ID_PARAM_DESCRIPTION, required = true) - @PathVariable UUID requestId) throws Exception { + @PathVariable UUID requestId) throws Exception { accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ); return versionControlService.getVersionLoadStatus(getCurrentUser(), requestId); } - @ApiOperation(value = "", notes = "" + - "```\n[\n" + + @ApiOperation(value = "List branches (listBranches)", notes = "" + + "Lists branches available in the remote repository. \n\n" + + "Response example: \n" + + MARKDOWN_CODE_BLOCK_START + + "[\n" + " {\n" + " \"name\": \"master\",\n" + " \"default\": true\n" + @@ -312,31 +479,28 @@ public class EntitiesVersionControlController extends BaseController { " \"name\": \"dev-2\",\n" + " \"default\": false\n" + " }\n" + - "]\n\n```") + "]" + + MARKDOWN_CODE_BLOCK_END) @GetMapping("/branches") - public DeferredResult> listBranches() throws ThingsboardException { - try { - accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ); - final TenantId tenantId = getTenantId(); - ListenableFuture> branches = versionControlService.listBranches(tenantId); - return wrapFuture(Futures.transform(branches, remoteBranches -> { - List infos = new ArrayList<>(); + public DeferredResult> listBranches() throws Exception { + accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ); + final TenantId tenantId = getTenantId(); + ListenableFuture> branches = versionControlService.listBranches(tenantId); + return wrapFuture(Futures.transform(branches, remoteBranches -> { + List infos = new ArrayList<>(); - String defaultBranch = versionControlService.getVersionControlSettings(tenantId).getDefaultBranch(); - if (StringUtils.isNotEmpty(defaultBranch)) { - infos.add(new BranchInfo(defaultBranch, true)); + String defaultBranch = versionControlService.getVersionControlSettings(tenantId).getDefaultBranch(); + if (StringUtils.isNotEmpty(defaultBranch)) { + infos.add(new BranchInfo(defaultBranch, true)); + } + + remoteBranches.forEach(branch -> { + if (!branch.equals(defaultBranch)) { + infos.add(new BranchInfo(branch, false)); } - - remoteBranches.forEach(branch -> { - if (!branch.equals(defaultBranch)) { - infos.add(new BranchInfo(branch, false)); - } - }); - return infos; - }, MoreExecutors.directExecutor())); - } catch (Exception e) { - throw handleException(e); - } + }); + return infos; + }, MoreExecutors.directExecutor())); } @Data From a3944c50477c69748022922f3663ce6acbe808a4 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 22 Jun 2022 15:56:50 +0300 Subject: [PATCH 6/8] Introduced new proto for device activity --- .../device/DeviceActorMessageProcessor.java | 7 +++++ .../edge/rpc/processor/BaseEdgeProcessor.java | 10 +++++++ .../rpc/processor/TelemetryEdgeProcessor.java | 30 ++++++++++++++++++- common/cluster-api/src/main/proto/queue.proto | 5 ++++ 4 files changed, 51 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index b0830317a7..28c7a6a499 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -418,6 +418,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { if (msg.hasUplinkNotificationMsg()) { processUplinkNotificationMsg(context, sessionInfo, msg.getUplinkNotificationMsg()); } + if (msg.hasDeviceActivity()) { + handleDeviceActivity(context, msg.getDeviceActivity()); + } callback.onSuccess(); } @@ -729,6 +732,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } } + private void handleDeviceActivity(TbActorCtx context, TransportProtos.DeviceActivityProto deviceActivityProto) { + systemContext.getDeviceStateService().onDeviceActivity(tenantId, deviceId, deviceActivityProto.getLastActivityTime()); + } + void processCredentialsUpdate(TbActorMsg msg) { if (((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials().getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) { sessions.forEach((k, v) -> { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index c67f7a5a14..8f37d743b5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EdgeUtils; @@ -53,6 +54,8 @@ import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.service.edge.rpc.constructor.AdminSettingsMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AlarmMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AssetMsgConstructor; @@ -142,6 +145,13 @@ public abstract class BaseEdgeProcessor { @Autowired protected OtaPackageService otaPackageService; + @Autowired + protected PartitionService partitionService; + + @Autowired + @Lazy + protected TbQueueProducerProvider producerProvider; + @Autowired protected DataValidator deviceValidator; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java index 5c807f9434..88ea417c24 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java @@ -52,6 +52,8 @@ import org.thingsboard.server.common.data.kv.AttributeKey; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.util.JsonUtils; @@ -61,9 +63,12 @@ import org.thingsboard.server.gen.edge.v1.EntityDataProto; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.util.TbCoreComponent; import javax.annotation.Nullable; +import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -77,6 +82,13 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { private final Gson gson = new Gson(); + private TbQueueProducer> tbCoreMsgProducer; + + @PostConstruct + public void init() { + tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); + } + public List> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) { log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData); List> result = new ArrayList<>(); @@ -95,7 +107,23 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); } if (EntityType.DEVICE.equals(entityId.getEntityType())) { - deviceStateService.onDeviceActivity(tenantId, new DeviceId(entityId.getId()), System.currentTimeMillis()); + DeviceId deviceId = new DeviceId(entityId.getId()); + + TransportProtos.SessionInfoProto sessionInfo = TransportProtos.SessionInfoProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()).build(); + + TransportProtos.DeviceActivityProto deviceActivityProto = TransportProtos.DeviceActivityProto.newBuilder() + .setLastActivityTime(System.currentTimeMillis()).build(); + + TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setDeviceActivity(deviceActivityProto).build(); + + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); + tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(), + TransportProtos.ToCoreMsg.newBuilder().setToDeviceActorMsg(msg).build()), null); } } if (entityData.hasAttributeDeleteMsg()) { diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 3079978bce..7fefe18d4a 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -454,6 +454,10 @@ message GetOtaPackageResponseMsg { string fileName = 8; } +message DeviceActivityProto { + int64 lastActivityTime = 1; +} + //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. message SubscriptionInfoProto { int64 lastActivityTime = 1; @@ -483,6 +487,7 @@ message TransportToDeviceActorMsg { ToDeviceRpcResponseStatusMsg rpcResponseStatusMsg = 10; SendPendingRPCMsg sendPendingRPC = 11; UplinkNotificationMsg uplinkNotificationMsg = 12; + DeviceActivityProto deviceActivity = 13; } message TransportToRuleEngineMsg { From 731fb1eae893006d5c9a5b5deee527d8fb071447 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 22 Jun 2022 19:42:00 +0300 Subject: [PATCH 7/8] Code review changes: avoid creating device actor if not needed --- .../device/DeviceActorMessageProcessor.java | 7 ------- .../rpc/processor/TelemetryEdgeProcessor.java | 11 +++-------- .../queue/DefaultTbCoreConsumerService.java | 18 ++++++++++++++++++ .../service/queue/TbCoreConsumerStats.java | 5 +++++ common/cluster-api/src/main/proto/queue.proto | 8 ++++++-- 5 files changed, 32 insertions(+), 17 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 28c7a6a499..b0830317a7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -418,9 +418,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { if (msg.hasUplinkNotificationMsg()) { processUplinkNotificationMsg(context, sessionInfo, msg.getUplinkNotificationMsg()); } - if (msg.hasDeviceActivity()) { - handleDeviceActivity(context, msg.getDeviceActivity()); - } callback.onSuccess(); } @@ -732,10 +729,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } } - private void handleDeviceActivity(TbActorCtx context, TransportProtos.DeviceActivityProto deviceActivityProto) { - systemContext.getDeviceStateService().onDeviceActivity(tenantId, deviceId, deviceActivityProto.getLastActivityTime()); - } - void processCredentialsUpdate(TbActorMsg msg) { if (((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials().getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) { sessions.forEach((k, v) -> { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java index 88ea417c24..f30111e1ef 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java @@ -109,21 +109,16 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { if (EntityType.DEVICE.equals(entityId.getEntityType())) { DeviceId deviceId = new DeviceId(entityId.getId()); - TransportProtos.SessionInfoProto sessionInfo = TransportProtos.SessionInfoProto.newBuilder() + TransportProtos.DeviceActivityProto deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) - .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()).build(); - - TransportProtos.DeviceActivityProto deviceActivityProto = TransportProtos.DeviceActivityProto.newBuilder() + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) .setLastActivityTime(System.currentTimeMillis()).build(); - TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setDeviceActivity(deviceActivityProto).build(); - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(), - TransportProtos.ToCoreMsg.newBuilder().setToDeviceActorMsg(msg).build()), null); + TransportProtos.ToCoreMsg.newBuilder().setDeviceActivityMsg(deviceActivityMsg).build()), null); } } if (entityData.hasAttributeDeleteMsg()) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index d071d591a1..a3bae8bc4e 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -28,6 +28,7 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.common.msg.MsgType; @@ -236,6 +237,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray()); if (actorMsg.isPresent()) { @@ -520,6 +524,20 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService Date: Wed, 22 Jun 2022 19:54:10 +0300 Subject: [PATCH 8/8] Introduced new counter for device activities --- .../server/service/queue/TbCoreConsumerStats.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java index 8f12cbb78a..11740aeb79 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java @@ -38,6 +38,7 @@ public class TbCoreConsumerStats { public static final String SUBSCRIPTION_MSGS = "subMsgs"; public static final String TO_CORE_NOTIFICATIONS = "coreNfs"; public static final String EDGE_NOTIFICATIONS = "edgeNfs"; + public static final String DEVICE_ACTIVITIES = "deviceActivity"; private final StatsCounter totalCounter; private final StatsCounter sessionEventCounter; @@ -52,6 +53,7 @@ public class TbCoreConsumerStats { private final StatsCounter subscriptionMsgCounter; private final StatsCounter toCoreNotificationsCounter; private final StatsCounter edgeNotificationsCounter; + private final StatsCounter deviceActivitiesCounter; private final List counters = new ArrayList<>(); @@ -70,6 +72,7 @@ public class TbCoreConsumerStats { this.subscriptionMsgCounter = register(statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS)); this.toCoreNotificationsCounter = register(statsFactory.createStatsCounter(statsKey, TO_CORE_NOTIFICATIONS)); this.edgeNotificationsCounter = register(statsFactory.createStatsCounter(statsKey, EDGE_NOTIFICATIONS)); + this.deviceActivitiesCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_ACTIVITIES)); } private StatsCounter register(StatsCounter counter){ @@ -114,7 +117,7 @@ public class TbCoreConsumerStats { public void log(TransportProtos.DeviceActivityProto msg) { totalCounter.increment(); - deviceStateCounter.increment(); + deviceActivitiesCounter.increment(); } public void log(TransportProtos.SubscriptionMgrMsgProto msg) {