diff --git a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java index c5d3483faf..6a616671eb 100644 --- a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java +++ b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java @@ -137,6 +137,7 @@ public class ControllerConstants { protected static final String EDGE_ASSIGN_RECEIVE_STEP_DESCRIPTION = "(Edge will receive this instantly, if it's currently connected, or once it's going to be connected to platform). "; protected static final String ENTITY_VERSION_TEXT_SEARCH_DESCRIPTION = "The case insensitive 'substring' filter based on the entity version name."; + protected static final String VERSION_ID_PARAM_DESCRIPTION = "Version id, for example fd82625bdd7d6131cf8027b44ee967012ecaf990. Represents commit hash."; protected static final String MARKDOWN_CODE_BLOCK_START = "```json\n"; protected static final String MARKDOWN_CODE_BLOCK_END = "\n```"; diff --git a/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java b/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java index fff3c6a4bb..33e7e11a3f 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java @@ -58,14 +58,19 @@ import java.util.List; import java.util.UUID; import java.util.stream.Collectors; +import static org.thingsboard.server.controller.ControllerConstants.MARKDOWN_CODE_BLOCK_END; +import static org.thingsboard.server.controller.ControllerConstants.MARKDOWN_CODE_BLOCK_START; import static org.thingsboard.server.controller.ControllerConstants.NEW_LINE; +import static org.thingsboard.server.controller.ControllerConstants.PAGE_DATA_PARAMETERS; import static org.thingsboard.server.controller.ControllerConstants.PAGE_SIZE_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.PAGE_NUMBER_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.ENTITY_VERSION_TEXT_SEARCH_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.SORT_PROPERTY_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.SORT_ORDER_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.SORT_ORDER_ALLOWABLE_VALUES; +import static org.thingsboard.server.controller.ControllerConstants.TENANT_AUTHORITY_PARAGRAPH; import static org.thingsboard.server.controller.ControllerConstants.VC_REQUEST_ID_PARAM_DESCRIPTION; +import static org.thingsboard.server.controller.ControllerConstants.VERSION_ID_PARAM_DESCRIPTION; @RestController @TbCoreComponent @@ -77,9 +82,20 @@ public class EntitiesVersionControlController extends BaseController { private final EntitiesVersionControlService versionControlService; - @ApiOperation(value = "", notes = "" + - "SINGLE_ENTITY:" + NEW_LINE + - "```\n{\n" + + @ApiOperation(value = "Save entities version (saveEntitiesVersion)", notes = "" + + "Creates a new version of entities (or a single entity) by request.\n" + + "Supported entity types: CUSTOMER, ASSET, RULE_CHAIN, DASHBOARD, DEVICE_PROFILE, DEVICE, ENTITY_VIEW, WIDGETS_BUNDLE." + NEW_LINE + + "There are two available types of request: `SINGLE_ENTITY` and `COMPLEX`. " + + "Each of them contains version name (`versionName`) and name of a branch (`branch`) to create version (commit) in. " + + "If specified branch does not exists in a remote repo, then new empty branch will be created. " + + "Request of the `SINGLE_ENTITY` type has id of an entity (`entityId`) and additional configuration (`config`) " + + "which has following options: \n" + + "- `saveRelations` - whether to add inbound and outbound relations of type COMMON to created entity version;\n" + + "- `saveAttributes` - to save attributes of server scope (and also shared scope for devices);\n" + + "- `saveCredentials` - when saving a version of a device, to add its credentials to the version." + NEW_LINE + + "An example of a `SINGLE_ENTITY` version create request:\n" + + MARKDOWN_CODE_BLOCK_START + + "{\n" + " \"type\": \"SINGLE_ENTITY\",\n" + "\n" + " \"versionName\": \"Version 1.0\",\n" + @@ -90,11 +106,25 @@ public class EntitiesVersionControlController extends BaseController { " \"id\": \"b79448e0-d4f4-11ec-847b-0f432358ab48\"\n" + " },\n" + " \"config\": {\n" + - " \"saveRelations\": true\n" + + " \"saveRelations\": true,\n" + + " \"saveAttributes\": true,\n" + + " \"saveCredentials\": false\n" + " }\n" + - "}\n```" + NEW_LINE + - "COMPLEX:" + NEW_LINE + - "```\n{\n" + + "}" + + MARKDOWN_CODE_BLOCK_END + NEW_LINE + + "Second request type (`COMPLEX`), additionally to `branch` and `versionName`, contains following properties:\n" + + "- `entityTypes` - a structure with entity types to export and configuration for each entity type; " + + " this configuration has all the options available for `SINGLE_ENTITY` and additionally has these ones: \n" + + " - `allEntities` and `entityIds` - if you want to save the version of all entities of the entity type " + + " then set `allEntities` param to true, otherwise set it to false and specify the list of specific entities (`entityIds`);\n" + + " - `syncStrategy` - synchronization strategy to use for this entity type: when set to `OVERWRITE` " + + " then the list of remote entities of this type will be overwritten by newly added entities. If set to " + + " `MERGE` - existing remote entities of this entity type will not be removed, new entities will just " + + " be added on top (or existing remote entities will be updated).\n" + + "- `syncStrategy` - default synchronization strategy to use when it is not specified for an entity type." + NEW_LINE + + "Example for this type of request:\n" + + MARKDOWN_CODE_BLOCK_START + + "{\n" + " \"type\": \"COMPLEX\",\n" + "\n" + " \"versionName\": \"Devices and profiles: release 2\",\n" + @@ -105,7 +135,9 @@ public class EntitiesVersionControlController extends BaseController { " \"DEVICE\": {\n" + " \"syncStrategy\": null,\n" + " \"allEntities\": true,\n" + - " \"saveRelations\": true\n" + + " \"saveRelations\": true,\n" + + " \"saveAttributes\": true,\n" + + " \"saveCredentials\": true\n" + " },\n" + " \"DEVICE_PROFILE\": {\n" + " \"syncStrategy\": \"MERGE\",\n" + @@ -116,7 +148,11 @@ public class EntitiesVersionControlController extends BaseController { " \"saveRelations\": true\n" + " }\n" + " }\n" + - "}\n```") + "}" + + MARKDOWN_CODE_BLOCK_END + NEW_LINE + + "Response wil contain generated request UUID, that can be then used to retrieve " + + "status of operation via `getVersionCreateRequestStatus`.\n" + + TENANT_AUTHORITY_PARAGRAPH) @PostMapping("/version") public DeferredResult saveEntitiesVersion(@RequestBody VersionCreateRequest request) throws Exception { SecurityUser user = getCurrentUser(); @@ -124,7 +160,32 @@ public class EntitiesVersionControlController extends BaseController { return wrapFuture(versionControlService.saveEntitiesVersion(user, request)); } - @ApiOperation(value = "", notes = "") + @ApiOperation(value = "Get version create request status (getVersionCreateRequestStatus)", notes = "" + + "Returns the status of previously made version create request. " + NEW_LINE + + "This status contains following properties:\n" + + "- `done` - whether request processing is finished;\n" + + "- `version` - created version info: timestamp, version id (commit hash), commit name and commit author;\n" + + "- `added` - count of items that were created in the remote repo;\n" + + "- `modified` - modified items count;\n" + + "- `removed` - removed items count;\n" + + "- `error` - error message, if an error occurred while handling the request." + NEW_LINE + + "An example of successful status:\n" + + MARKDOWN_CODE_BLOCK_START + + "{\n" + + " \"done\": true,\n" + + " \"added\": 10,\n" + + " \"modified\": 2,\n" + + " \"removed\": 5,\n" + + " \"version\": {\n" + + " \"timestamp\": 1655198528000,\n" + + " \"id\":\"8a834dd389ed80e0759ba8ee338b3f1fd160a114\",\n" + + " \"name\": \"My devices v2.0\",\n" + + " \"author\": \"John Doe\"\n" + + " },\n" + + " \"error\": null\n" + + "}" + + MARKDOWN_CODE_BLOCK_END + + TENANT_AUTHORITY_PARAGRAPH) @GetMapping(value = "/version/{requestId}/status") public VersionCreationResult getVersionCreateRequestStatus(@ApiParam(value = VC_REQUEST_ID_PARAM_DESCRIPTION, required = true) @PathVariable UUID requestId) throws Exception { @@ -132,13 +193,42 @@ public class EntitiesVersionControlController extends BaseController { return versionControlService.getVersionCreateStatus(getCurrentUser(), requestId); } - @ApiOperation(value = "", notes = "" + - "```\n[\n" + - " {\n" + - " \"id\": \"c30c8bcaed3f0813649f0dee51a89d04d0a12b28\",\n" + - " \"name\": \"Device profile 1 version 1.0\"\n" + - " }\n" + - "]\n```") + @ApiOperation(value = "List entity versions (listEntityVersions)", notes = "" + + "Returns list of versions for a specific entity in a concrete branch. \n" + + "You need to specify external id of an entity to list versions for. This is `externalId` property of an entity, " + + "or otherwise if not set - simply id of this entity. \n" + + "If specified branch does not exist - empty page data will be returned. " + NEW_LINE + + "Each version info item has timestamp, id, name and author. Version id can then be used to restore the version. " + + PAGE_DATA_PARAMETERS + NEW_LINE + + "Response example: \n" + + MARKDOWN_CODE_BLOCK_START + + "{\n" + + " \"data\": [\n" + + " {\n" + + " \"timestamp\": 1655198593000,\n" + + " \"id\": \"fd82625bdd7d6131cf8027b44ee967012ecaf990\",\n" + + " \"name\": \"Devices and assets - v2.0\",\n" + + " \"author\": \"John Doe \"\n" + + " },\n" + + " {\n" + + " \"timestamp\": 1655198528000,\n" + + " \"id\": \"682adcffa9c8a2f863af6f00c4850323acbd4219\",\n" + + " \"name\": \"Update my device\",\n" + + " \"author\": \"John Doe \"\n" + + " },\n" + + " {\n" + + " \"timestamp\": 1655198280000,\n" + + " \"id\": \"d2a6087c2b30e18cc55e7cdda345a8d0dfb959a4\",\n" + + " \"name\": \"Devices and assets - v1.0\",\n" + + " \"author\": \"John Doe \"\n" + + " }\n" + + " ],\n" + + " \"totalPages\": 1,\n" + + " \"totalElements\": 3,\n" + + " \"hasNext\": false\n" + + "}" + + MARKDOWN_CODE_BLOCK_END + + TENANT_AUTHORITY_PARAGRAPH) @GetMapping(value = "/version/{entityType}/{externalEntityUuid}", params = {"branch", "pageSize", "page"}) public DeferredResult> listEntityVersions(@PathVariable EntityType entityType, @PathVariable UUID externalEntityUuid, @@ -159,13 +249,12 @@ public class EntitiesVersionControlController extends BaseController { return wrapFuture(versionControlService.listEntityVersions(getTenantId(), branch, externalEntityId, pageLink)); } - @ApiOperation(value = "", notes = "" + - "```\n[\n" + - " {\n" + - " \"id\": \"c30c8bcaed3f0813649f0dee51a89d04d0a12b28\",\n" + - " \"name\": \"Device profiles from dev\"\n" + - " }\n" + - "]\n```") + @ApiOperation(value = "List entity type versions (listEntityTypeVersions)", notes = "" + + "Returns list of versions of an entity type in a branch. This is a collected list of versions that were created " + + "for entities of this type in a remote branch. \n" + + "If specified branch does not exist - empty page data will be returned. " + + "The response structure is the same as for `listEntityVersions` API method." + + TENANT_AUTHORITY_PARAGRAPH) @GetMapping(value = "/version/{entityType}", params = {"branch", "pageSize", "page"}) public DeferredResult> listEntityTypeVersions(@PathVariable EntityType entityType, @RequestParam String branch, @@ -184,21 +273,11 @@ public class EntitiesVersionControlController extends BaseController { return wrapFuture(versionControlService.listEntityTypeVersions(getTenantId(), branch, entityType, pageLink)); } - @ApiOperation(value = "", notes = "" + - "```\n[\n" + - " {\n" + - " \"id\": \"ba9baaca1742b730e7331f31a6a51da5fc7da7f7\",\n" + - " \"name\": \"Device 1 removed\"\n" + - " },\n" + - " {\n" + - " \"id\": \"b3c28d722d328324c7c15b0b30047b0c40011cf7\",\n" + - " \"name\": \"Device profiles added\"\n" + - " },\n" + - " {\n" + - " \"id\": \"c30c8bcaed3f0813649f0dee51a89d04d0a12b28\",\n" + - " \"name\": \"Devices added\"\n" + - " }\n" + - "]\n```") + @ApiOperation(value = "List all versions (listVersions)", notes = "" + + "Lists all available versions in a branch for all entity types. \n" + + "If specified branch does not exist - empty page data will be returned. " + + "The response format is the same as for `listEntityVersions` API method." + + TENANT_AUTHORITY_PARAGRAPH) @GetMapping(value = "/version", params = {"branch", "pageSize", "page"}) public DeferredResult> listVersions(@RequestParam String branch, @ApiParam(value = PAGE_SIZE_DESCRIPTION, required = true) @@ -217,23 +296,42 @@ public class EntitiesVersionControlController extends BaseController { } + @ApiOperation(value = "List entities at version (listEntitiesAtVersion)", notes = "" + + "Returns a list of remote entities of a specific entity type that are available at a concrete version. \n" + + "Each entity item in the result has `externalId` property. " + + "Entities order will be the same as in the repository." + + TENANT_AUTHORITY_PARAGRAPH) @GetMapping(value = "/entity/{entityType}/{versionId}", params = {"branch"}) public DeferredResult> listEntitiesAtVersion(@PathVariable EntityType entityType, + @ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true) @PathVariable String versionId, @RequestParam String branch) throws Exception { accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ); return wrapFuture(versionControlService.listEntitiesAtVersion(getTenantId(), branch, versionId, entityType)); } + @ApiOperation(value = "List all entities at version (listAllEntitiesAtVersion)", notes = "" + + "Returns a list of all remote entities available in a specific version. " + + "Response type is the same as for listAllEntitiesAtVersion API method. \n" + + "Returned entities order will be the same as in the repository." + + TENANT_AUTHORITY_PARAGRAPH) @GetMapping(value = "/entity/{versionId}", params = {"branch"}) - public DeferredResult> listAllEntitiesAtVersion(@PathVariable String versionId, + public DeferredResult> listAllEntitiesAtVersion(@ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true) + @PathVariable String versionId, @RequestParam String branch) throws Exception { accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ); return wrapFuture(versionControlService.listAllEntitiesAtVersion(getTenantId(), branch, versionId)); } + @ApiOperation(value = "Get entity data info (getEntityDataInfo)", notes = "" + + "Retrieves short info about the remote entity by external id at a concrete version. \n" + + "Returned entity data info contains following properties: " + + "`hasRelations` (whether stored entity data contains relations), `hasAttributes` (contains attributes) and " + + "`hasCredentials` (whether stored device data has credentials)." + + TENANT_AUTHORITY_PARAGRAPH) @GetMapping("/info/{versionId}/{entityType}/{externalEntityUuid}") - public DeferredResult getEntityDataInfo(@PathVariable String versionId, + public DeferredResult getEntityDataInfo(@ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true) + @PathVariable String versionId, @PathVariable EntityType entityType, @PathVariable UUID externalEntityUuid) throws Exception { accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ); @@ -241,19 +339,33 @@ public class EntitiesVersionControlController extends BaseController { return wrapFuture(versionControlService.getEntityDataInfo(getCurrentUser(), entityId, versionId)); } + @ApiOperation(value = "Compare entity data to version (compareEntityDataToVersion)", notes = "" + + "Returns an object with current entity data and the one at a specific version. " + + "Entity data structure is the same as stored in a repository. " + + TENANT_AUTHORITY_PARAGRAPH) @GetMapping(value = "/diff/{entityType}/{internalEntityUuid}", params = {"branch", "versionId"}) public DeferredResult compareEntityDataToVersion(@PathVariable EntityType entityType, @PathVariable UUID internalEntityUuid, @RequestParam String branch, + @ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true) @RequestParam String versionId) throws Exception { accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ); EntityId entityId = EntityIdFactory.getByTypeAndUuid(entityType, internalEntityUuid); return wrapFuture(versionControlService.compareEntityDataToVersion(getCurrentUser(), branch, entityId, versionId)); } - @ApiOperation(value = "", notes = "" + - "SINGLE_ENTITY:" + NEW_LINE + - "```\n{\n" + + @ApiOperation(value = "Load entities version (loadEntitiesVersion)", notes = "" + + "Loads specific version of remote entities (or single entity) by request. " + + "Supported entity types: CUSTOMER, ASSET, RULE_CHAIN, DASHBOARD, DEVICE_PROFILE, DEVICE, ENTITY_VIEW, WIDGETS_BUNDLE." + NEW_LINE + + "There are multiple types of request. Each of them requires branch name (`branch`) and version id (`versionId`). " + + "Request of type `SINGLE_ENTITY` is needed to restore a concrete version of a specific entity. It contains " + + "id of a remote entity (`externalEntityId`) and additional configuration (`config`):\n" + + "- `loadRelations` - to update relations list (in case `saveRelations` option was enabled during version creation);\n" + + "- `loadAttributes` - to load entity attributes (if `saveAttributes` config option was enabled);\n" + + "- `loadCredentials` - to update device credentials (if `saveCredentials` option was enabled during version creation)." + NEW_LINE + + "An example of such request:\n" + + MARKDOWN_CODE_BLOCK_START + + "{\n" + " \"type\": \"SINGLE_ENTITY\",\n" + " \n" + " \"branch\": \"dev\",\n" + @@ -265,11 +377,23 @@ public class EntitiesVersionControlController extends BaseController { " },\n" + " \"config\": {\n" + " \"loadRelations\": false,\n" + - " \"findExistingEntityByName\": false\n" + + " \"loadAttributes\": true,\n" + + " \"loadCredentials\": true\n" + " }\n" + - "}\n```" + NEW_LINE + - "ENTITY_TYPE:" + NEW_LINE + - "```\n{\n" + + "}" + + MARKDOWN_CODE_BLOCK_END + NEW_LINE + + "Another request type (`ENTITY_TYPE`) is needed to load specific version of the whole entity types. " + + "It contains a structure with entity types to load and configs for each entity type (`entityTypes`). " + + "For each specified entity type, the method will load all remote entities of this type that are present " + + "at the version. A config for each entity type contains the same options as in `SINGLE_ENTITY` request type, and " + + "additionally contains following options:\n" + + "- `removeOtherEntities` - to remove local entities that are not present on the remote - basically to " + + " overwrite local entity type with the remote one;\n" + + "- `findExistingEntityByName` - when you are loading some remote entities that are not yet present at this tenant, " + + " try to find existing entity by name and update it rather than create new." + NEW_LINE + + "Here is an example of the request to completely restore version of the whole device entity type:\n" + + MARKDOWN_CODE_BLOCK_START + + "{\n" + " \"type\": \"ENTITY_TYPE\",\n" + "\n" + " \"branch\": \"dev\",\n" + @@ -277,12 +401,18 @@ public class EntitiesVersionControlController extends BaseController { "\n" + " \"entityTypes\": {\n" + " \"DEVICE\": {\n" + - " \"loadRelations\": false,\n" + + " \"removeOtherEntities\": true,\n" + " \"findExistingEntityByName\": false,\n" + - " \"removeOtherEntities\": true\n" + + " \"loadRelations\": true,\n" + + " \"loadAttributes\": true,\n" + + " \"loadCredentials\": true\n" + " }\n" + " }\n" + - "}\n```") + "}" + + MARKDOWN_CODE_BLOCK_END + NEW_LINE + + "The response will contain generated request UUID that is to be used to check the status of operation " + + "via `getVersionLoadRequestStatus`." + + TENANT_AUTHORITY_PARAGRAPH) @PostMapping("/entity") public UUID loadEntitiesVersion(@RequestBody VersionLoadRequest request) throws Exception { SecurityUser user = getCurrentUser(); @@ -290,17 +420,54 @@ public class EntitiesVersionControlController extends BaseController { return versionControlService.loadEntitiesVersion(user, request); } - @ApiOperation(value = "", notes = "") + @ApiOperation(value = "Get version load request status (getVersionLoadRequestStatus)", notes = "" + + "Returns the status of previously made version load request. " + + "The structure contains following parameters:\n" + + "- `done` - if the request was successfully processed;\n" + + "- `result` - a list of load results for each entity type:\n" + + " - `created` - created entities count;\n" + + " - `updated` - updated entities count;\n" + + " - `deleted` - removed entities count.\n" + + "- `error` - if an error occurred during processing, error info:\n" + + " - `type` - error type;\n" + + " - `source` - an external id of remote entity;\n" + + " - `target` - if failed to find referenced entity by external id - this external id;\n" + + " - `message` - error message." + NEW_LINE + + "An example of successfully processed request status:\n" + + MARKDOWN_CODE_BLOCK_START + + "{\n" + + " \"done\": true,\n" + + " \"result\": [\n" + + " {\n" + + " \"entityType\": \"DEVICE\",\n" + + " \"created\": 10,\n" + + " \"updated\": 5,\n" + + " \"deleted\": 5\n" + + " },\n" + + " {\n" + + " \"entityType\": \"ASSET\",\n" + + " \"created\": 4,\n" + + " \"updated\": 0,\n" + + " \"deleted\": 8\n" + + " }\n" + + " ]\n" + + "}" + + MARKDOWN_CODE_BLOCK_END + + TENANT_AUTHORITY_PARAGRAPH + ) @GetMapping(value = "/entity/{requestId}/status") public VersionLoadResult getVersionLoadRequestStatus(@ApiParam(value = VC_REQUEST_ID_PARAM_DESCRIPTION, required = true) - @PathVariable UUID requestId) throws Exception { + @PathVariable UUID requestId) throws Exception { accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ); return versionControlService.getVersionLoadStatus(getCurrentUser(), requestId); } - @ApiOperation(value = "", notes = "" + - "```\n[\n" + + @ApiOperation(value = "List branches (listBranches)", notes = "" + + "Lists branches available in the remote repository. \n\n" + + "Response example: \n" + + MARKDOWN_CODE_BLOCK_START + + "[\n" + " {\n" + " \"name\": \"master\",\n" + " \"default\": true\n" + @@ -313,9 +480,10 @@ public class EntitiesVersionControlController extends BaseController { " \"name\": \"dev-2\",\n" + " \"default\": false\n" + " }\n" + - "]\n\n```") + "]" + + MARKDOWN_CODE_BLOCK_END) @GetMapping("/branches") - public DeferredResult> listBranches() throws ThingsboardException { + public DeferredResult> listBranches() throws Exception { try { accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ); final TenantId tenantId = getTenantId(); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index c67f7a5a14..8f37d743b5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EdgeUtils; @@ -53,6 +54,8 @@ import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.service.edge.rpc.constructor.AdminSettingsMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AlarmMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.AssetMsgConstructor; @@ -142,6 +145,13 @@ public abstract class BaseEdgeProcessor { @Autowired protected OtaPackageService otaPackageService; + @Autowired + protected PartitionService partitionService; + + @Autowired + @Lazy + protected TbQueueProducerProvider producerProvider; + @Autowired protected DataValidator deviceValidator; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java index 769d988507..f30111e1ef 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java @@ -52,6 +52,8 @@ import org.thingsboard.server.common.data.kv.AttributeKey; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.util.JsonUtils; @@ -61,9 +63,12 @@ import org.thingsboard.server.gen.edge.v1.EntityDataProto; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.util.TbCoreComponent; import javax.annotation.Nullable; +import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -77,14 +82,19 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { private final Gson gson = new Gson(); + private TbQueueProducer> tbCoreMsgProducer; + + @PostConstruct + public void init() { + tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); + } + public List> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) { log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData); List> result = new ArrayList<>(); EntityId entityId = constructEntityId(entityData); if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) { - // @voba - in terms of performance we should not fetch device from DB by id - // TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId); - TbMsgMetaData metaData = new TbMsgMetaData(); + TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId); metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE); if (entityData.hasPostAttributesMsg()) { result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData)); @@ -96,6 +106,20 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { if (entityData.hasPostTelemetryMsg()) { result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); } + if (EntityType.DEVICE.equals(entityId.getEntityType())) { + DeviceId deviceId = new DeviceId(entityId.getId()); + + TransportProtos.DeviceActivityProto deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setLastActivityTime(System.currentTimeMillis()).build(); + + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); + tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(), + TransportProtos.ToCoreMsg.newBuilder().setDeviceActivityMsg(deviceActivityMsg).build()), null); + } } if (entityData.hasAttributeDeleteMsg()) { result.add(processAttributeDeleteMsg(tenantId, entityId, entityData.getAttributeDeleteMsg(), entityData.getEntityType())); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index d071d591a1..a3bae8bc4e 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -28,6 +28,7 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.common.msg.MsgType; @@ -236,6 +237,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray()); if (actorMsg.isPresent()) { @@ -520,6 +524,20 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService counters = new ArrayList<>(); @@ -70,6 +72,7 @@ public class TbCoreConsumerStats { this.subscriptionMsgCounter = register(statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS)); this.toCoreNotificationsCounter = register(statsFactory.createStatsCounter(statsKey, TO_CORE_NOTIFICATIONS)); this.edgeNotificationsCounter = register(statsFactory.createStatsCounter(statsKey, EDGE_NOTIFICATIONS)); + this.deviceActivitiesCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_ACTIVITIES)); } private StatsCounter register(StatsCounter counter){ @@ -112,6 +115,11 @@ public class TbCoreConsumerStats { edgeNotificationsCounter.increment(); } + public void log(TransportProtos.DeviceActivityProto msg) { + totalCounter.increment(); + deviceActivitiesCounter.increment(); + } + public void log(TransportProtos.SubscriptionMgrMsgProto msg) { totalCounter.increment(); subscriptionMsgCounter.increment(); diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java index 2b56af82cb..c10a9dd12d 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java @@ -1352,17 +1352,17 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { String timeseriesKey = "key"; String timeseriesValue = "25"; data.addProperty(timeseriesKey, timeseriesValue); - UplinkMsg.Builder uplinkMsgBuilder1 = UplinkMsg.newBuilder(); + UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); EntityDataProto.Builder entityDataBuilder = EntityDataProto.newBuilder(); entityDataBuilder.setPostTelemetryMsg(JsonConverter.convertToTelemetryProto(data, System.currentTimeMillis())); entityDataBuilder.setEntityType(device.getId().getEntityType().name()); entityDataBuilder.setEntityIdMSB(device.getUuidId().getMostSignificantBits()); entityDataBuilder.setEntityIdLSB(device.getUuidId().getLeastSignificantBits()); testAutoGeneratedCodeByProtobuf(entityDataBuilder); - uplinkMsgBuilder1.addEntityData(entityDataBuilder.build()); + uplinkMsgBuilder.addEntityData(entityDataBuilder.build()); - testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder1); - edgeImitator.sendUplinkMsg(uplinkMsgBuilder1.build()); + testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); JsonObject attributesData = new JsonObject(); String attributesKey = "test_attr"; @@ -1394,9 +1394,24 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { String attributeValuesUrl = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/attributes/" + DataConstants.SERVER_SCOPE; List> attributes = doGetAsyncTyped(attributeValuesUrl, new TypeReference<>() {}); - Assert.assertEquals(2, attributes.size()); - var result = attributes.stream().filter(kv -> kv.get("key").equals(attributesKey)).filter(kv -> kv.get("value").equals(attributesValue)).findFirst(); - Assert.assertTrue(result.isPresent()); + + Assert.assertEquals(3, attributes.size()); + + Optional> activeAttributeOpt = getAttributeByKey("active", attributes); + Assert.assertTrue(activeAttributeOpt.isPresent()); + Map activeAttribute = activeAttributeOpt.get(); + Assert.assertEquals("true", activeAttribute.get("value")); + + Optional> customAttributeOpt = getAttributeByKey(attributesKey, attributes); + Assert.assertTrue(customAttributeOpt.isPresent()); + Map customAttribute = customAttributeOpt.get(); + Assert.assertEquals(attributesValue, customAttribute.get("value")); + + doDelete("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/SERVER_SCOPE?keys=" + attributesKey, String.class); + } + + private Optional> getAttributeByKey(String key, List> attributes) { + return attributes.stream().filter(kv -> kv.get("key").equals(key)).findFirst(); } private Map>> loadDeviceTimeseries(Device device, String timeseriesKey) throws Exception { diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index b7362c7f03..45cbda66d3 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -454,6 +454,14 @@ message GetOtaPackageResponseMsg { string fileName = 8; } +message DeviceActivityProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 deviceIdMSB = 3; + int64 deviceIdLSB = 4; + int64 lastActivityTime = 5; +} + //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. message SubscriptionInfoProto { int64 lastActivityTime = 1; @@ -907,6 +915,7 @@ message ToCoreMsg { SubscriptionMgrMsgProto toSubscriptionMgrMsg = 3; bytes toDeviceActorNotificationMsg = 4; EdgeNotificationMsgProto edgeNotificationMsg = 5; + DeviceActivityProto deviceActivityMsg = 6; } /* High priority messages with low latency are handled by ThingsBoard Core Service separately */