Fix conflicts

This commit is contained in:
Igor Kulikov 2022-06-23 14:19:44 +03:00
commit c82c6adb26
8 changed files with 320 additions and 67 deletions

View File

@ -137,6 +137,7 @@ public class ControllerConstants {
protected static final String EDGE_ASSIGN_RECEIVE_STEP_DESCRIPTION = "(Edge will receive this instantly, if it's currently connected, or once it's going to be connected to platform). ";
protected static final String ENTITY_VERSION_TEXT_SEARCH_DESCRIPTION = "The case insensitive 'substring' filter based on the entity version name.";
protected static final String VERSION_ID_PARAM_DESCRIPTION = "Version id, for example fd82625bdd7d6131cf8027b44ee967012ecaf990. Represents commit hash.";
protected static final String MARKDOWN_CODE_BLOCK_START = "```json\n";
protected static final String MARKDOWN_CODE_BLOCK_END = "\n```";

View File

@ -58,14 +58,19 @@ import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.thingsboard.server.controller.ControllerConstants.MARKDOWN_CODE_BLOCK_END;
import static org.thingsboard.server.controller.ControllerConstants.MARKDOWN_CODE_BLOCK_START;
import static org.thingsboard.server.controller.ControllerConstants.NEW_LINE;
import static org.thingsboard.server.controller.ControllerConstants.PAGE_DATA_PARAMETERS;
import static org.thingsboard.server.controller.ControllerConstants.PAGE_SIZE_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.PAGE_NUMBER_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.ENTITY_VERSION_TEXT_SEARCH_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.SORT_PROPERTY_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.SORT_ORDER_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.SORT_ORDER_ALLOWABLE_VALUES;
import static org.thingsboard.server.controller.ControllerConstants.TENANT_AUTHORITY_PARAGRAPH;
import static org.thingsboard.server.controller.ControllerConstants.VC_REQUEST_ID_PARAM_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.VERSION_ID_PARAM_DESCRIPTION;
@RestController
@TbCoreComponent
@ -77,9 +82,20 @@ public class EntitiesVersionControlController extends BaseController {
private final EntitiesVersionControlService versionControlService;
@ApiOperation(value = "", notes = "" +
"SINGLE_ENTITY:" + NEW_LINE +
"```\n{\n" +
@ApiOperation(value = "Save entities version (saveEntitiesVersion)", notes = "" +
"Creates a new version of entities (or a single entity) by request.\n" +
"Supported entity types: CUSTOMER, ASSET, RULE_CHAIN, DASHBOARD, DEVICE_PROFILE, DEVICE, ENTITY_VIEW, WIDGETS_BUNDLE." + NEW_LINE +
"There are two available types of request: `SINGLE_ENTITY` and `COMPLEX`. " +
"Each of them contains version name (`versionName`) and name of a branch (`branch`) to create version (commit) in. " +
"If specified branch does not exists in a remote repo, then new empty branch will be created. " +
"Request of the `SINGLE_ENTITY` type has id of an entity (`entityId`) and additional configuration (`config`) " +
"which has following options: \n" +
"- `saveRelations` - whether to add inbound and outbound relations of type COMMON to created entity version;\n" +
"- `saveAttributes` - to save attributes of server scope (and also shared scope for devices);\n" +
"- `saveCredentials` - when saving a version of a device, to add its credentials to the version." + NEW_LINE +
"An example of a `SINGLE_ENTITY` version create request:\n" +
MARKDOWN_CODE_BLOCK_START +
"{\n" +
" \"type\": \"SINGLE_ENTITY\",\n" +
"\n" +
" \"versionName\": \"Version 1.0\",\n" +
@ -90,11 +106,25 @@ public class EntitiesVersionControlController extends BaseController {
" \"id\": \"b79448e0-d4f4-11ec-847b-0f432358ab48\"\n" +
" },\n" +
" \"config\": {\n" +
" \"saveRelations\": true\n" +
" \"saveRelations\": true,\n" +
" \"saveAttributes\": true,\n" +
" \"saveCredentials\": false\n" +
" }\n" +
"}\n```" + NEW_LINE +
"COMPLEX:" + NEW_LINE +
"```\n{\n" +
"}" +
MARKDOWN_CODE_BLOCK_END + NEW_LINE +
"Second request type (`COMPLEX`), additionally to `branch` and `versionName`, contains following properties:\n" +
"- `entityTypes` - a structure with entity types to export and configuration for each entity type; " +
" this configuration has all the options available for `SINGLE_ENTITY` and additionally has these ones: \n" +
" - `allEntities` and `entityIds` - if you want to save the version of all entities of the entity type " +
" then set `allEntities` param to true, otherwise set it to false and specify the list of specific entities (`entityIds`);\n" +
" - `syncStrategy` - synchronization strategy to use for this entity type: when set to `OVERWRITE` " +
" then the list of remote entities of this type will be overwritten by newly added entities. If set to " +
" `MERGE` - existing remote entities of this entity type will not be removed, new entities will just " +
" be added on top (or existing remote entities will be updated).\n" +
"- `syncStrategy` - default synchronization strategy to use when it is not specified for an entity type." + NEW_LINE +
"Example for this type of request:\n" +
MARKDOWN_CODE_BLOCK_START +
"{\n" +
" \"type\": \"COMPLEX\",\n" +
"\n" +
" \"versionName\": \"Devices and profiles: release 2\",\n" +
@ -105,7 +135,9 @@ public class EntitiesVersionControlController extends BaseController {
" \"DEVICE\": {\n" +
" \"syncStrategy\": null,\n" +
" \"allEntities\": true,\n" +
" \"saveRelations\": true\n" +
" \"saveRelations\": true,\n" +
" \"saveAttributes\": true,\n" +
" \"saveCredentials\": true\n" +
" },\n" +
" \"DEVICE_PROFILE\": {\n" +
" \"syncStrategy\": \"MERGE\",\n" +
@ -116,7 +148,11 @@ public class EntitiesVersionControlController extends BaseController {
" \"saveRelations\": true\n" +
" }\n" +
" }\n" +
"}\n```")
"}" +
MARKDOWN_CODE_BLOCK_END + NEW_LINE +
"Response wil contain generated request UUID, that can be then used to retrieve " +
"status of operation via `getVersionCreateRequestStatus`.\n" +
TENANT_AUTHORITY_PARAGRAPH)
@PostMapping("/version")
public DeferredResult<UUID> saveEntitiesVersion(@RequestBody VersionCreateRequest request) throws Exception {
SecurityUser user = getCurrentUser();
@ -124,7 +160,32 @@ public class EntitiesVersionControlController extends BaseController {
return wrapFuture(versionControlService.saveEntitiesVersion(user, request));
}
@ApiOperation(value = "", notes = "")
@ApiOperation(value = "Get version create request status (getVersionCreateRequestStatus)", notes = "" +
"Returns the status of previously made version create request. " + NEW_LINE +
"This status contains following properties:\n" +
"- `done` - whether request processing is finished;\n" +
"- `version` - created version info: timestamp, version id (commit hash), commit name and commit author;\n" +
"- `added` - count of items that were created in the remote repo;\n" +
"- `modified` - modified items count;\n" +
"- `removed` - removed items count;\n" +
"- `error` - error message, if an error occurred while handling the request." + NEW_LINE +
"An example of successful status:\n" +
MARKDOWN_CODE_BLOCK_START +
"{\n" +
" \"done\": true,\n" +
" \"added\": 10,\n" +
" \"modified\": 2,\n" +
" \"removed\": 5,\n" +
" \"version\": {\n" +
" \"timestamp\": 1655198528000,\n" +
" \"id\":\"8a834dd389ed80e0759ba8ee338b3f1fd160a114\",\n" +
" \"name\": \"My devices v2.0\",\n" +
" \"author\": \"John Doe\"\n" +
" },\n" +
" \"error\": null\n" +
"}" +
MARKDOWN_CODE_BLOCK_END +
TENANT_AUTHORITY_PARAGRAPH)
@GetMapping(value = "/version/{requestId}/status")
public VersionCreationResult getVersionCreateRequestStatus(@ApiParam(value = VC_REQUEST_ID_PARAM_DESCRIPTION, required = true)
@PathVariable UUID requestId) throws Exception {
@ -132,13 +193,42 @@ public class EntitiesVersionControlController extends BaseController {
return versionControlService.getVersionCreateStatus(getCurrentUser(), requestId);
}
@ApiOperation(value = "", notes = "" +
"```\n[\n" +
" {\n" +
" \"id\": \"c30c8bcaed3f0813649f0dee51a89d04d0a12b28\",\n" +
" \"name\": \"Device profile 1 version 1.0\"\n" +
" }\n" +
"]\n```")
@ApiOperation(value = "List entity versions (listEntityVersions)", notes = "" +
"Returns list of versions for a specific entity in a concrete branch. \n" +
"You need to specify external id of an entity to list versions for. This is `externalId` property of an entity, " +
"or otherwise if not set - simply id of this entity. \n" +
"If specified branch does not exist - empty page data will be returned. " + NEW_LINE +
"Each version info item has timestamp, id, name and author. Version id can then be used to restore the version. " +
PAGE_DATA_PARAMETERS + NEW_LINE +
"Response example: \n" +
MARKDOWN_CODE_BLOCK_START +
"{\n" +
" \"data\": [\n" +
" {\n" +
" \"timestamp\": 1655198593000,\n" +
" \"id\": \"fd82625bdd7d6131cf8027b44ee967012ecaf990\",\n" +
" \"name\": \"Devices and assets - v2.0\",\n" +
" \"author\": \"John Doe <johndoe@gmail.com>\"\n" +
" },\n" +
" {\n" +
" \"timestamp\": 1655198528000,\n" +
" \"id\": \"682adcffa9c8a2f863af6f00c4850323acbd4219\",\n" +
" \"name\": \"Update my device\",\n" +
" \"author\": \"John Doe <johndoe@gmail.com>\"\n" +
" },\n" +
" {\n" +
" \"timestamp\": 1655198280000,\n" +
" \"id\": \"d2a6087c2b30e18cc55e7cdda345a8d0dfb959a4\",\n" +
" \"name\": \"Devices and assets - v1.0\",\n" +
" \"author\": \"John Doe <johndoe@gmail.com>\"\n" +
" }\n" +
" ],\n" +
" \"totalPages\": 1,\n" +
" \"totalElements\": 3,\n" +
" \"hasNext\": false\n" +
"}" +
MARKDOWN_CODE_BLOCK_END +
TENANT_AUTHORITY_PARAGRAPH)
@GetMapping(value = "/version/{entityType}/{externalEntityUuid}", params = {"branch", "pageSize", "page"})
public DeferredResult<PageData<EntityVersion>> listEntityVersions(@PathVariable EntityType entityType,
@PathVariable UUID externalEntityUuid,
@ -159,13 +249,12 @@ public class EntitiesVersionControlController extends BaseController {
return wrapFuture(versionControlService.listEntityVersions(getTenantId(), branch, externalEntityId, pageLink));
}
@ApiOperation(value = "", notes = "" +
"```\n[\n" +
" {\n" +
" \"id\": \"c30c8bcaed3f0813649f0dee51a89d04d0a12b28\",\n" +
" \"name\": \"Device profiles from dev\"\n" +
" }\n" +
"]\n```")
@ApiOperation(value = "List entity type versions (listEntityTypeVersions)", notes = "" +
"Returns list of versions of an entity type in a branch. This is a collected list of versions that were created " +
"for entities of this type in a remote branch. \n" +
"If specified branch does not exist - empty page data will be returned. " +
"The response structure is the same as for `listEntityVersions` API method." +
TENANT_AUTHORITY_PARAGRAPH)
@GetMapping(value = "/version/{entityType}", params = {"branch", "pageSize", "page"})
public DeferredResult<PageData<EntityVersion>> listEntityTypeVersions(@PathVariable EntityType entityType,
@RequestParam String branch,
@ -184,21 +273,11 @@ public class EntitiesVersionControlController extends BaseController {
return wrapFuture(versionControlService.listEntityTypeVersions(getTenantId(), branch, entityType, pageLink));
}
@ApiOperation(value = "", notes = "" +
"```\n[\n" +
" {\n" +
" \"id\": \"ba9baaca1742b730e7331f31a6a51da5fc7da7f7\",\n" +
" \"name\": \"Device 1 removed\"\n" +
" },\n" +
" {\n" +
" \"id\": \"b3c28d722d328324c7c15b0b30047b0c40011cf7\",\n" +
" \"name\": \"Device profiles added\"\n" +
" },\n" +
" {\n" +
" \"id\": \"c30c8bcaed3f0813649f0dee51a89d04d0a12b28\",\n" +
" \"name\": \"Devices added\"\n" +
" }\n" +
"]\n```")
@ApiOperation(value = "List all versions (listVersions)", notes = "" +
"Lists all available versions in a branch for all entity types. \n" +
"If specified branch does not exist - empty page data will be returned. " +
"The response format is the same as for `listEntityVersions` API method." +
TENANT_AUTHORITY_PARAGRAPH)
@GetMapping(value = "/version", params = {"branch", "pageSize", "page"})
public DeferredResult<PageData<EntityVersion>> listVersions(@RequestParam String branch,
@ApiParam(value = PAGE_SIZE_DESCRIPTION, required = true)
@ -217,23 +296,42 @@ public class EntitiesVersionControlController extends BaseController {
}
@ApiOperation(value = "List entities at version (listEntitiesAtVersion)", notes = "" +
"Returns a list of remote entities of a specific entity type that are available at a concrete version. \n" +
"Each entity item in the result has `externalId` property. " +
"Entities order will be the same as in the repository." +
TENANT_AUTHORITY_PARAGRAPH)
@GetMapping(value = "/entity/{entityType}/{versionId}", params = {"branch"})
public DeferredResult<List<VersionedEntityInfo>> listEntitiesAtVersion(@PathVariable EntityType entityType,
@ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true)
@PathVariable String versionId,
@RequestParam String branch) throws Exception {
accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ);
return wrapFuture(versionControlService.listEntitiesAtVersion(getTenantId(), branch, versionId, entityType));
}
@ApiOperation(value = "List all entities at version (listAllEntitiesAtVersion)", notes = "" +
"Returns a list of all remote entities available in a specific version. " +
"Response type is the same as for listAllEntitiesAtVersion API method. \n" +
"Returned entities order will be the same as in the repository." +
TENANT_AUTHORITY_PARAGRAPH)
@GetMapping(value = "/entity/{versionId}", params = {"branch"})
public DeferredResult<List<VersionedEntityInfo>> listAllEntitiesAtVersion(@PathVariable String versionId,
public DeferredResult<List<VersionedEntityInfo>> listAllEntitiesAtVersion(@ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true)
@PathVariable String versionId,
@RequestParam String branch) throws Exception {
accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ);
return wrapFuture(versionControlService.listAllEntitiesAtVersion(getTenantId(), branch, versionId));
}
@ApiOperation(value = "Get entity data info (getEntityDataInfo)", notes = "" +
"Retrieves short info about the remote entity by external id at a concrete version. \n" +
"Returned entity data info contains following properties: " +
"`hasRelations` (whether stored entity data contains relations), `hasAttributes` (contains attributes) and " +
"`hasCredentials` (whether stored device data has credentials)." +
TENANT_AUTHORITY_PARAGRAPH)
@GetMapping("/info/{versionId}/{entityType}/{externalEntityUuid}")
public DeferredResult<EntityDataInfo> getEntityDataInfo(@PathVariable String versionId,
public DeferredResult<EntityDataInfo> getEntityDataInfo(@ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true)
@PathVariable String versionId,
@PathVariable EntityType entityType,
@PathVariable UUID externalEntityUuid) throws Exception {
accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ);
@ -241,19 +339,33 @@ public class EntitiesVersionControlController extends BaseController {
return wrapFuture(versionControlService.getEntityDataInfo(getCurrentUser(), entityId, versionId));
}
@ApiOperation(value = "Compare entity data to version (compareEntityDataToVersion)", notes = "" +
"Returns an object with current entity data and the one at a specific version. " +
"Entity data structure is the same as stored in a repository. " +
TENANT_AUTHORITY_PARAGRAPH)
@GetMapping(value = "/diff/{entityType}/{internalEntityUuid}", params = {"branch", "versionId"})
public DeferredResult<EntityDataDiff> compareEntityDataToVersion(@PathVariable EntityType entityType,
@PathVariable UUID internalEntityUuid,
@RequestParam String branch,
@ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true)
@RequestParam String versionId) throws Exception {
accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ);
EntityId entityId = EntityIdFactory.getByTypeAndUuid(entityType, internalEntityUuid);
return wrapFuture(versionControlService.compareEntityDataToVersion(getCurrentUser(), branch, entityId, versionId));
}
@ApiOperation(value = "", notes = "" +
"SINGLE_ENTITY:" + NEW_LINE +
"```\n{\n" +
@ApiOperation(value = "Load entities version (loadEntitiesVersion)", notes = "" +
"Loads specific version of remote entities (or single entity) by request. " +
"Supported entity types: CUSTOMER, ASSET, RULE_CHAIN, DASHBOARD, DEVICE_PROFILE, DEVICE, ENTITY_VIEW, WIDGETS_BUNDLE." + NEW_LINE +
"There are multiple types of request. Each of them requires branch name (`branch`) and version id (`versionId`). " +
"Request of type `SINGLE_ENTITY` is needed to restore a concrete version of a specific entity. It contains " +
"id of a remote entity (`externalEntityId`) and additional configuration (`config`):\n" +
"- `loadRelations` - to update relations list (in case `saveRelations` option was enabled during version creation);\n" +
"- `loadAttributes` - to load entity attributes (if `saveAttributes` config option was enabled);\n" +
"- `loadCredentials` - to update device credentials (if `saveCredentials` option was enabled during version creation)." + NEW_LINE +
"An example of such request:\n" +
MARKDOWN_CODE_BLOCK_START +
"{\n" +
" \"type\": \"SINGLE_ENTITY\",\n" +
" \n" +
" \"branch\": \"dev\",\n" +
@ -265,11 +377,23 @@ public class EntitiesVersionControlController extends BaseController {
" },\n" +
" \"config\": {\n" +
" \"loadRelations\": false,\n" +
" \"findExistingEntityByName\": false\n" +
" \"loadAttributes\": true,\n" +
" \"loadCredentials\": true\n" +
" }\n" +
"}\n```" + NEW_LINE +
"ENTITY_TYPE:" + NEW_LINE +
"```\n{\n" +
"}" +
MARKDOWN_CODE_BLOCK_END + NEW_LINE +
"Another request type (`ENTITY_TYPE`) is needed to load specific version of the whole entity types. " +
"It contains a structure with entity types to load and configs for each entity type (`entityTypes`). " +
"For each specified entity type, the method will load all remote entities of this type that are present " +
"at the version. A config for each entity type contains the same options as in `SINGLE_ENTITY` request type, and " +
"additionally contains following options:\n" +
"- `removeOtherEntities` - to remove local entities that are not present on the remote - basically to " +
" overwrite local entity type with the remote one;\n" +
"- `findExistingEntityByName` - when you are loading some remote entities that are not yet present at this tenant, " +
" try to find existing entity by name and update it rather than create new." + NEW_LINE +
"Here is an example of the request to completely restore version of the whole device entity type:\n" +
MARKDOWN_CODE_BLOCK_START +
"{\n" +
" \"type\": \"ENTITY_TYPE\",\n" +
"\n" +
" \"branch\": \"dev\",\n" +
@ -277,12 +401,18 @@ public class EntitiesVersionControlController extends BaseController {
"\n" +
" \"entityTypes\": {\n" +
" \"DEVICE\": {\n" +
" \"loadRelations\": false,\n" +
" \"removeOtherEntities\": true,\n" +
" \"findExistingEntityByName\": false,\n" +
" \"removeOtherEntities\": true\n" +
" \"loadRelations\": true,\n" +
" \"loadAttributes\": true,\n" +
" \"loadCredentials\": true\n" +
" }\n" +
" }\n" +
"}\n```")
"}" +
MARKDOWN_CODE_BLOCK_END + NEW_LINE +
"The response will contain generated request UUID that is to be used to check the status of operation " +
"via `getVersionLoadRequestStatus`." +
TENANT_AUTHORITY_PARAGRAPH)
@PostMapping("/entity")
public UUID loadEntitiesVersion(@RequestBody VersionLoadRequest request) throws Exception {
SecurityUser user = getCurrentUser();
@ -290,17 +420,54 @@ public class EntitiesVersionControlController extends BaseController {
return versionControlService.loadEntitiesVersion(user, request);
}
@ApiOperation(value = "", notes = "")
@ApiOperation(value = "Get version load request status (getVersionLoadRequestStatus)", notes = "" +
"Returns the status of previously made version load request. " +
"The structure contains following parameters:\n" +
"- `done` - if the request was successfully processed;\n" +
"- `result` - a list of load results for each entity type:\n" +
" - `created` - created entities count;\n" +
" - `updated` - updated entities count;\n" +
" - `deleted` - removed entities count.\n" +
"- `error` - if an error occurred during processing, error info:\n" +
" - `type` - error type;\n" +
" - `source` - an external id of remote entity;\n" +
" - `target` - if failed to find referenced entity by external id - this external id;\n" +
" - `message` - error message." + NEW_LINE +
"An example of successfully processed request status:\n" +
MARKDOWN_CODE_BLOCK_START +
"{\n" +
" \"done\": true,\n" +
" \"result\": [\n" +
" {\n" +
" \"entityType\": \"DEVICE\",\n" +
" \"created\": 10,\n" +
" \"updated\": 5,\n" +
" \"deleted\": 5\n" +
" },\n" +
" {\n" +
" \"entityType\": \"ASSET\",\n" +
" \"created\": 4,\n" +
" \"updated\": 0,\n" +
" \"deleted\": 8\n" +
" }\n" +
" ]\n" +
"}" +
MARKDOWN_CODE_BLOCK_END +
TENANT_AUTHORITY_PARAGRAPH
)
@GetMapping(value = "/entity/{requestId}/status")
public VersionLoadResult getVersionLoadRequestStatus(@ApiParam(value = VC_REQUEST_ID_PARAM_DESCRIPTION, required = true)
@PathVariable UUID requestId) throws Exception {
@PathVariable UUID requestId) throws Exception {
accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ);
return versionControlService.getVersionLoadStatus(getCurrentUser(), requestId);
}
@ApiOperation(value = "", notes = "" +
"```\n[\n" +
@ApiOperation(value = "List branches (listBranches)", notes = "" +
"Lists branches available in the remote repository. \n\n" +
"Response example: \n" +
MARKDOWN_CODE_BLOCK_START +
"[\n" +
" {\n" +
" \"name\": \"master\",\n" +
" \"default\": true\n" +
@ -313,9 +480,10 @@ public class EntitiesVersionControlController extends BaseController {
" \"name\": \"dev-2\",\n" +
" \"default\": false\n" +
" }\n" +
"]\n\n```")
"]" +
MARKDOWN_CODE_BLOCK_END)
@GetMapping("/branches")
public DeferredResult<List<BranchInfo>> listBranches() throws ThingsboardException {
public DeferredResult<List<BranchInfo>> listBranches() throws Exception {
try {
accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ);
final TenantId tenantId = getTenantId();

View File

@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EdgeUtils;
@ -53,6 +54,8 @@ import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.service.edge.rpc.constructor.AdminSettingsMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.AlarmMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.AssetMsgConstructor;
@ -142,6 +145,13 @@ public abstract class BaseEdgeProcessor {
@Autowired
protected OtaPackageService otaPackageService;
@Autowired
protected PartitionService partitionService;
@Autowired
@Lazy
protected TbQueueProducerProvider producerProvider;
@Autowired
protected DataValidator<Device> deviceValidator;

View File

@ -52,6 +52,8 @@ import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.util.JsonUtils;
@ -61,9 +63,12 @@ import org.thingsboard.server.gen.edge.v1.EntityDataProto;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.util.TbCoreComponent;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -77,14 +82,19 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
private final Gson gson = new Gson();
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> tbCoreMsgProducer;
@PostConstruct
public void init() {
tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer();
}
public List<ListenableFuture<Void>> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) {
log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData);
List<ListenableFuture<Void>> result = new ArrayList<>();
EntityId entityId = constructEntityId(entityData);
if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) {
// @voba - in terms of performance we should not fetch device from DB by id
// TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId);
TbMsgMetaData metaData = new TbMsgMetaData();
TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId);
metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE);
if (entityData.hasPostAttributesMsg()) {
result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData));
@ -96,6 +106,20 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
if (entityData.hasPostTelemetryMsg()) {
result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData));
}
if (EntityType.DEVICE.equals(entityId.getEntityType())) {
DeviceId deviceId = new DeviceId(entityId.getId());
TransportProtos.DeviceActivityProto deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastActivityTime(System.currentTimeMillis()).build();
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(),
TransportProtos.ToCoreMsg.newBuilder().setDeviceActivityMsg(deviceActivityMsg).build()), null);
}
}
if (entityData.hasAttributeDeleteMsg()) {
result.add(processAttributeDeleteMsg(tenantId, entityId, entityData.getAttributeDeleteMsg(), entityData.getEntityType()));

View File

@ -28,6 +28,7 @@ import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.msg.MsgType;
@ -236,6 +237,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
} else if (toCoreMsg.hasEdgeNotificationMsg()) {
log.trace("[{}] Forwarding message to edge service {}", id, toCoreMsg.getEdgeNotificationMsg());
forwardToEdgeNotificationService(toCoreMsg.getEdgeNotificationMsg(), callback);
} else if (toCoreMsg.hasDeviceActivityMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceActivityMsg());
forwardToStateService(toCoreMsg.getDeviceActivityMsg(), callback);
} else if (!toCoreMsg.getToDeviceActorNotificationMsg().isEmpty()) {
Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray());
if (actorMsg.isPresent()) {
@ -520,6 +524,20 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
stateService.onQueueMsg(deviceStateServiceMsg, callback);
}
private void forwardToStateService(TransportProtos.DeviceActivityProto deviceActivityMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceActivityMsg);
}
TenantId tenantId = TenantId.fromUUID(new UUID(deviceActivityMsg.getTenantIdMSB(), deviceActivityMsg.getTenantIdLSB()));
DeviceId deviceId = new DeviceId(new UUID(deviceActivityMsg.getDeviceIdMSB(), deviceActivityMsg.getDeviceIdLSB()));
try {
stateService.onDeviceActivity(tenantId, deviceId, deviceActivityMsg.getLastActivityTime());
callback.onSuccess();
} catch (Exception e) {
callback.onFailure(new RuntimeException("Failed update device activity for device [" + deviceId.getId() + "]!", e));
}
}
private void forwardToEdgeNotificationService(EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(edgeNotificationMsg);

View File

@ -38,6 +38,7 @@ public class TbCoreConsumerStats {
public static final String SUBSCRIPTION_MSGS = "subMsgs";
public static final String TO_CORE_NOTIFICATIONS = "coreNfs";
public static final String EDGE_NOTIFICATIONS = "edgeNfs";
public static final String DEVICE_ACTIVITIES = "deviceActivity";
private final StatsCounter totalCounter;
private final StatsCounter sessionEventCounter;
@ -52,6 +53,7 @@ public class TbCoreConsumerStats {
private final StatsCounter subscriptionMsgCounter;
private final StatsCounter toCoreNotificationsCounter;
private final StatsCounter edgeNotificationsCounter;
private final StatsCounter deviceActivitiesCounter;
private final List<StatsCounter> counters = new ArrayList<>();
@ -70,6 +72,7 @@ public class TbCoreConsumerStats {
this.subscriptionMsgCounter = register(statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS));
this.toCoreNotificationsCounter = register(statsFactory.createStatsCounter(statsKey, TO_CORE_NOTIFICATIONS));
this.edgeNotificationsCounter = register(statsFactory.createStatsCounter(statsKey, EDGE_NOTIFICATIONS));
this.deviceActivitiesCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_ACTIVITIES));
}
private StatsCounter register(StatsCounter counter){
@ -112,6 +115,11 @@ public class TbCoreConsumerStats {
edgeNotificationsCounter.increment();
}
public void log(TransportProtos.DeviceActivityProto msg) {
totalCounter.increment();
deviceActivitiesCounter.increment();
}
public void log(TransportProtos.SubscriptionMgrMsgProto msg) {
totalCounter.increment();
subscriptionMsgCounter.increment();

View File

@ -1352,17 +1352,17 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
String timeseriesKey = "key";
String timeseriesValue = "25";
data.addProperty(timeseriesKey, timeseriesValue);
UplinkMsg.Builder uplinkMsgBuilder1 = UplinkMsg.newBuilder();
UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();
EntityDataProto.Builder entityDataBuilder = EntityDataProto.newBuilder();
entityDataBuilder.setPostTelemetryMsg(JsonConverter.convertToTelemetryProto(data, System.currentTimeMillis()));
entityDataBuilder.setEntityType(device.getId().getEntityType().name());
entityDataBuilder.setEntityIdMSB(device.getUuidId().getMostSignificantBits());
entityDataBuilder.setEntityIdLSB(device.getUuidId().getLeastSignificantBits());
testAutoGeneratedCodeByProtobuf(entityDataBuilder);
uplinkMsgBuilder1.addEntityData(entityDataBuilder.build());
uplinkMsgBuilder.addEntityData(entityDataBuilder.build());
testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder1);
edgeImitator.sendUplinkMsg(uplinkMsgBuilder1.build());
testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder);
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
JsonObject attributesData = new JsonObject();
String attributesKey = "test_attr";
@ -1394,9 +1394,24 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
String attributeValuesUrl = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/attributes/" + DataConstants.SERVER_SCOPE;
List<Map<String, String>> attributes = doGetAsyncTyped(attributeValuesUrl, new TypeReference<>() {});
Assert.assertEquals(2, attributes.size());
var result = attributes.stream().filter(kv -> kv.get("key").equals(attributesKey)).filter(kv -> kv.get("value").equals(attributesValue)).findFirst();
Assert.assertTrue(result.isPresent());
Assert.assertEquals(3, attributes.size());
Optional<Map<String, String>> activeAttributeOpt = getAttributeByKey("active", attributes);
Assert.assertTrue(activeAttributeOpt.isPresent());
Map<String, String> activeAttribute = activeAttributeOpt.get();
Assert.assertEquals("true", activeAttribute.get("value"));
Optional<Map<String, String>> customAttributeOpt = getAttributeByKey(attributesKey, attributes);
Assert.assertTrue(customAttributeOpt.isPresent());
Map<String, String> customAttribute = customAttributeOpt.get();
Assert.assertEquals(attributesValue, customAttribute.get("value"));
doDelete("/api/plugins/telemetry/DEVICE/" + device.getId().getId() + "/SERVER_SCOPE?keys=" + attributesKey, String.class);
}
private Optional<Map<String, String>> getAttributeByKey(String key, List<Map<String, String>> attributes) {
return attributes.stream().filter(kv -> kv.get("key").equals(key)).findFirst();
}
private Map<String, List<Map<String, String>>> loadDeviceTimeseries(Device device, String timeseriesKey) throws Exception {

View File

@ -454,6 +454,14 @@ message GetOtaPackageResponseMsg {
string fileName = 8;
}
message DeviceActivityProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 deviceIdMSB = 3;
int64 deviceIdLSB = 4;
int64 lastActivityTime = 5;
}
//Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.
message SubscriptionInfoProto {
int64 lastActivityTime = 1;
@ -907,6 +915,7 @@ message ToCoreMsg {
SubscriptionMgrMsgProto toSubscriptionMgrMsg = 3;
bytes toDeviceActorNotificationMsg = 4;
EdgeNotificationMsgProto edgeNotificationMsg = 5;
DeviceActivityProto deviceActivityMsg = 6;
}
/* High priority messages with low latency are handled by ThingsBoard Core Service separately */