From e58d5b2d8a7e3760525ce5c9a0f497b299a9625d Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Mon, 14 Jun 2021 15:06:21 +0300 Subject: [PATCH] removed rpc status sent, added idx_rpc_tenant_id_device_id --- .../main/data/upgrade/3.2.2/schema_update.sql | 2 + .../device/DeviceActorMessageProcessor.java | 52 +++++++--------- .../server/controller/RpcController.java | 39 +++++++++++- .../action/RuleEngineEntityActionService.java | 2 +- .../server/service/rpc/TbRpcService.java | 11 ++-- .../server/dao/rpc/RpcService.java | 8 ++- .../server/common/data/DataConstants.java | 1 - .../server/common/data/rpc/RpcStatus.java | 2 +- .../server/dao/rpc/BaseRpcService.java | 60 +++++++++++++++---- .../thingsboard/server/dao/rpc/RpcDao.java | 4 +- .../server/dao/sql/rpc/JpaRpcDao.java | 8 ++- .../server/dao/sql/rpc/RpcRepository.java | 4 +- .../server/dao/tenant/TenantServiceImpl.java | 5 ++ .../resources/sql/schema-entities-idx.sql | 1 + .../engine/filter/TbMsgTypeSwitchNode.java | 4 +- .../src/app/shared/models/rule-node.models.ts | 6 +- 16 files changed, 147 insertions(+), 62 deletions(-) diff --git a/application/src/main/data/upgrade/3.2.2/schema_update.sql b/application/src/main/data/upgrade/3.2.2/schema_update.sql index 713793b341..d4404427f2 100644 --- a/application/src/main/data/upgrade/3.2.2/schema_update.sql +++ b/application/src/main/data/upgrade/3.2.2/schema_update.sql @@ -209,3 +209,5 @@ CREATE TABLE IF NOT EXISTS rpc ( status varchar(255) NOT NULL ); +CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id); + diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 8657313633..a1458fd5ac 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -174,20 +174,15 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { long timeout = request.getExpirationTime() - System.currentTimeMillis(); boolean persisted = request.isPersisted(); + if (timeout <= 0) { log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime()); - if (persisted) { - Rpc rpc = new Rpc(new RpcId(request.getId())); - rpc.setCreatedTime(System.currentTimeMillis()); - rpc.setTenantId(tenantId); - rpc.setDeviceId(deviceId); - rpc.setExpirationTime(request.getExpirationTime()); - rpc.setRequest(JacksonUtil.valueToTree(request)); - rpc.setStatus(RpcStatus.TIMEOUT); - systemContext.getTbRpcService().save(tenantId, rpc); + createRpc(request, RpcStatus.TIMEOUT); } return; + } else if (persisted) { + createRpc(request, RpcStatus.QUEUED); } boolean sent; @@ -204,24 +199,14 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { syncSessionSet.add(key); } }); - log.trace("46) Rpc syncSessionSet [{}] subscription after sent [{}]",syncSessionSet, rpcSubscriptions); + log.trace("46) Rpc syncSessionSet [{}] subscription after sent [{}]", syncSessionSet, rpcSubscriptions); syncSessionSet.forEach(rpcSubscriptions::remove); } - if (persisted) { - Rpc rpc = new Rpc(new RpcId(request.getId())); - rpc.setCreatedTime(System.currentTimeMillis()); - rpc.setTenantId(tenantId); - rpc.setDeviceId(deviceId); - rpc.setExpirationTime(request.getExpirationTime()); - rpc.setRequest(JacksonUtil.valueToTree(request)); - rpc.setStatus(sent ? RpcStatus.SENT : RpcStatus.QUEUED); - systemContext.getTbRpcService().save(tenantId, rpc); - if (!(sent || request.isOneway())) { - ObjectNode response = JacksonUtil.newObjectNode(); - response.put("rpcId", request.getId().toString()); - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), JacksonUtil.toString(response), null)); - } + if (persisted && !(sent || request.isOneway())) { + ObjectNode response = JacksonUtil.newObjectNode(); + response.put("rpcId", request.getId().toString()); + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), JacksonUtil.toString(response), null)); } if (request.isOneway() && sent) { @@ -237,6 +222,18 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } } + private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) { + Rpc rpc = new Rpc(new RpcId(request.getId())); + rpc.setCreatedTime(System.currentTimeMillis()); + rpc.setTenantId(tenantId); + rpc.setDeviceId(deviceId); + rpc.setExpirationTime(request.getExpirationTime()); + rpc.setRequest(JacksonUtil.valueToTree(request)); + rpc.setStatus(status); + systemContext.getTbRpcService().save(tenantId, rpc); + return systemContext.getTbRpcService().save(tenantId, rpc); + } + private ToDeviceRpcRequestMsg creteToDeviceRpcRequestMsg(ToDeviceRpcRequest request) { ToDeviceRpcRequestBody body = request.getBody(); return ToDeviceRpcRequestMsg.newBuilder() @@ -318,9 +315,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { .setPersisted(request.isPersisted()) .build(); - if (request.isPersisted()) { - systemContext.getTbRpcService().save(tenantId, new RpcId(request.getId()), RpcStatus.SENT, null); - } sendToTransport(rpcRequest, sessionId, nodeId); }; } @@ -821,10 +815,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { void init(TbActorCtx ctx) { schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(), systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout()); - PageLink pageLink = new PageLink(10); + PageLink pageLink = new PageLink(1024); PageData pageData; do { - pageData = systemContext.getTbRpcService().findAllByDeviceIdAndStatus(deviceId, RpcStatus.QUEUED, pageLink); + pageData = systemContext.getTbRpcService().findAllByDeviceIdAndStatus(tenantId, deviceId, RpcStatus.QUEUED, pageLink); pageData.getData().forEach(rpc -> { ToDeviceRpcRequest msg = JacksonUtil.convertValue(rpc.getRequest(), ToDeviceRpcRequest.class); long timeout = rpc.getExpirationTime() - System.currentTimeMillis(); diff --git a/application/src/main/java/org/thingsboard/server/controller/RpcController.java b/application/src/main/java/org/thingsboard/server/controller/RpcController.java index 8a75aad485..eae26542fa 100644 --- a/application/src/main/java/org/thingsboard/server/controller/RpcController.java +++ b/application/src/main/java/org/thingsboard/server/controller/RpcController.java @@ -29,6 +29,7 @@ import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; @@ -41,7 +42,10 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RpcId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UUIDBased; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.rpc.Rpc; +import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.queue.util.TbCoreComponent; @@ -95,7 +99,7 @@ public class RpcController extends BaseController { return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody); } - @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") @RequestMapping(value = "/persisted/{rpcId}", method = RequestMethod.GET) @ResponseBody public Rpc getPersistedRpc(@PathVariable("rpcId") String strRpc) throws ThingsboardException { @@ -108,6 +112,39 @@ public class RpcController extends BaseController { } } + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") + @RequestMapping(value = "/persisted/{deviceId}", method = RequestMethod.GET) + @ResponseBody + public PageData getPersistedRpcByDevice(@PathVariable("deviceId") String strDeviceId, + @RequestParam int pageSize, + @RequestParam int page, + @RequestParam RpcStatus rpcStatus, + @RequestParam(required = false) String textSearch, + @RequestParam(required = false) String sortProperty, + @RequestParam(required = false) String sortOrder) throws ThingsboardException { + checkParameter("DeviceId", strDeviceId); + try { + TenantId tenantId = getCurrentUser().getTenantId(); + PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder); + DeviceId deviceId = new DeviceId(UUID.fromString(strDeviceId)); + return checkNotNull(rpcService.findAllByDeviceIdAndStatus(tenantId, deviceId, rpcStatus, pageLink)); + } catch (Exception e) { + throw handleException(e); + } + } + + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") + @RequestMapping(value = "/persisted/{rpcId}", method = RequestMethod.DELETE) + @ResponseBody + public void deleteResource(@PathVariable("rpcId") String strRpc) throws ThingsboardException { + checkParameter("RpcId", strRpc); + try { + rpcService.deleteRpc(getTenantId(), new RpcId(UUID.fromString(strRpc))); + } catch (Exception e) { + throw handleException(e); + } + } + private DeferredResult handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody) throws ThingsboardException { try { JsonNode rpcRequestBody = jsonMapper.readTree(requestBody); diff --git a/application/src/main/java/org/thingsboard/server/service/action/RuleEngineEntityActionService.java b/application/src/main/java/org/thingsboard/server/service/action/RuleEngineEntityActionService.java index 754d32b704..f1320d1a7a 100644 --- a/application/src/main/java/org/thingsboard/server/service/action/RuleEngineEntityActionService.java +++ b/application/src/main/java/org/thingsboard/server/service/action/RuleEngineEntityActionService.java @@ -54,7 +54,7 @@ public class RuleEngineEntityActionService { private static final ObjectMapper json = new ObjectMapper(); - public void pushEntityActionToRuleEngine(EntityId entityId, Object entity, TenantId tenantId, CustomerId customerId, + public void pushEntityActionToRuleEngine(EntityId entityId, HasName entity, TenantId tenantId, CustomerId customerId, ActionType actionType, User user, Object... additionalInfo) { String msgType = null; switch (actionType) { diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/TbRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/TbRpcService.java index d410f12b1f..e0343485db 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/TbRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/TbRpcService.java @@ -41,9 +41,10 @@ public class TbRpcService { private final RpcService rpcService; private final TbClusterService tbClusterService; - public void save(TenantId tenantId, Rpc rpc) { - Rpc saved = rpcService.save(tenantId, rpc); + public Rpc save(TenantId tenantId, Rpc rpc) { + Rpc saved = rpcService.save(rpc); pushRpcMsgToRuleEngine(tenantId, saved); + return saved; } public void save(TenantId tenantId, RpcId rpcId, RpcStatus newStatus, JsonNode response) { @@ -53,7 +54,7 @@ public class TbRpcService { if (response != null) { foundRpc.setResponse(response); } - Rpc saved = rpcService.save(tenantId, foundRpc); + Rpc saved = rpcService.save(foundRpc); pushRpcMsgToRuleEngine(tenantId, saved); } else { log.warn("[{}] Failed to update RPC status because RPC was already deleted", rpcId); @@ -69,8 +70,8 @@ public class TbRpcService { return rpcService.findById(tenantId, rpcId); } - public PageData findAllByDeviceIdAndStatus(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) { - return rpcService.findAllByDeviceIdAndStatus(deviceId, rpcStatus, pageLink); + public PageData findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) { + return rpcService.findAllByDeviceIdAndStatus(tenantId, deviceId, rpcStatus, pageLink); } } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/rpc/RpcService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/rpc/RpcService.java index 22446d143e..4bdb1a169d 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/rpc/RpcService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/rpc/RpcService.java @@ -25,13 +25,15 @@ import org.thingsboard.server.common.data.rpc.Rpc; import org.thingsboard.server.common.data.rpc.RpcStatus; public interface RpcService { - Rpc save(TenantId tenantId, Rpc rpc); + Rpc save(Rpc rpc); - void remove(TenantId tenantId, RpcId id); + void deleteRpc(TenantId tenantId, RpcId id); + + void deleteAllRpcByTenantId(TenantId tenantId); Rpc findById(TenantId tenantId, RpcId id); ListenableFuture findRpcByIdAsync(TenantId tenantId, RpcId id); - PageData findAllByDeviceIdAndStatus(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink); + PageData findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index de97bf324c..efc81846d5 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -77,7 +77,6 @@ public class DataConstants { public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE"; public static final String RPC_QUEUED = "RPC_QUEUED"; - public static final String RPC_SENT = "RPC_SENT"; public static final String RPC_DELIVERED = "RPC_DELIVERED"; public static final String RPC_SUCCESSFUL = "RPC_SUCCESSFUL"; public static final String RPC_TIMEOUT = "RPC_TIMEOUT"; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java b/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java index fea9e30a75..c80d0c5993 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java @@ -16,5 +16,5 @@ package org.thingsboard.server.common.data.rpc; public enum RpcStatus { - QUEUED, SENT, DELIVERED, SUCCESSFUL, TIMEOUT, FAILED + QUEUED, DELIVERED, SUCCESSFUL, TIMEOUT, FAILED } diff --git a/dao/src/main/java/org/thingsboard/server/dao/rpc/BaseRpcService.java b/dao/src/main/java/org/thingsboard/server/dao/rpc/BaseRpcService.java index 577263a3bd..02b4bbe433 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rpc/BaseRpcService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rpc/BaseRpcService.java @@ -26,35 +26,75 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.rpc.Rpc; import org.thingsboard.server.common.data.rpc.RpcStatus; +import org.thingsboard.server.dao.service.PaginatedRemover; + +import static org.thingsboard.server.dao.service.Validator.validateId; +import static org.thingsboard.server.dao.service.Validator.validatePageLink; @Service @Slf4j @RequiredArgsConstructor public class BaseRpcService implements RpcService { + public static final String INCORRECT_TENANT_ID = "Incorrect tenantId "; + public static final String INCORRECT_RPC_ID = "Incorrect rpcId "; + private final RpcDao rpcDao; @Override - public Rpc save(TenantId tenantId, Rpc rpc) { - return rpcDao.save(tenantId, rpc); + public Rpc save(Rpc rpc) { + log.trace("Executing save, [{}]", rpc); + return rpcDao.save(rpc.getTenantId(), rpc); } @Override - public void remove(TenantId tenantId, RpcId id) { - rpcDao.removeById(tenantId, id.getId()); + public void deleteRpc(TenantId tenantId, RpcId rpcId) { + log.trace("Executing deleteRpc, tenantId [{}], rpcId [{}]", tenantId, rpcId); + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); + validateId(rpcId, INCORRECT_RPC_ID + rpcId); + rpcDao.removeById(tenantId, rpcId.getId()); } @Override - public Rpc findById(TenantId tenantId, RpcId id) { - return rpcDao.findById(tenantId, id.getId()); + public void deleteAllRpcByTenantId(TenantId tenantId) { + log.trace("Executing deleteAllRpcByTenantId, tenantId [{}]", tenantId); + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); + tenantRpcRemover.removeEntities(tenantId, tenantId); } @Override - public ListenableFuture findRpcByIdAsync(TenantId tenantId, RpcId id) { - return rpcDao.findByIdAsync(tenantId, id.getId()); + public Rpc findById(TenantId tenantId, RpcId rpcId) { + log.trace("Executing findById, tenantId [{}], rpcId [{}]", tenantId, rpcId); + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); + validateId(rpcId, INCORRECT_RPC_ID + rpcId); + return rpcDao.findById(tenantId, rpcId.getId()); } @Override - public PageData findAllByDeviceIdAndStatus(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) { - return rpcDao.findAllByDeviceId(deviceId, rpcStatus, pageLink); + public ListenableFuture findRpcByIdAsync(TenantId tenantId, RpcId rpcId) { + log.trace("Executing findRpcByIdAsync, tenantId [{}], rpcId: [{}]", tenantId, rpcId); + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); + validateId(rpcId, INCORRECT_RPC_ID + rpcId); + return rpcDao.findByIdAsync(tenantId, rpcId.getId()); } + + @Override + public PageData findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) { + log.trace("Executing findAllByDeviceIdAndStatus, tenantId [{}], deviceId [{}], rpcStatus [{}], pageLink [{}]", tenantId, deviceId, rpcStatus, pageLink); + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); + validatePageLink(pageLink); + return rpcDao.findAllByDeviceId(tenantId, deviceId, rpcStatus, pageLink); + } + + private PaginatedRemover tenantRpcRemover = + new PaginatedRemover<>() { + @Override + protected PageData findEntities(TenantId tenantId, TenantId id, PageLink pageLink) { + return rpcDao.findAllRpcByTenantId(id, pageLink); + } + + @Override + protected void removeEntity(TenantId tenantId, Rpc entity) { + deleteRpc(tenantId, entity.getId()); + } + }; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java b/dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java index a3aa32720a..63af784dbb 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java @@ -24,7 +24,9 @@ import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.dao.Dao; public interface RpcDao extends Dao { - PageData findAllByDeviceId(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink); + PageData findAllByDeviceId(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink); + + PageData findAllRpcByTenantId(TenantId tenantId, PageLink pageLink); Long deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDao.java index 22fc3396f7..221ef17361 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDao.java @@ -49,10 +49,14 @@ public class JpaRpcDao extends JpaAbstractDao implements RpcDao return rpcRepository; } + @Override + public PageData findAllByDeviceId(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) { + return DaoUtil.toPageData(rpcRepository.findAllByTenantIdAndDeviceIdAndStatus(tenantId.getId(), deviceId.getId(), rpcStatus, DaoUtil.toPageable(pageLink))); + } @Override - public PageData findAllByDeviceId(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) { - return DaoUtil.toPageData(rpcRepository.findAllByDeviceIdAndStatus(deviceId.getId(), rpcStatus, DaoUtil.toPageable(pageLink))); + public PageData findAllRpcByTenantId(TenantId tenantId, PageLink pageLink) { + return DaoUtil.toPageData(rpcRepository.findAllByTenantId(tenantId.getId(), DaoUtil.toPageable(pageLink))); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/RpcRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/RpcRepository.java index 565d87e627..76b67b3823 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/RpcRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/RpcRepository.java @@ -26,7 +26,9 @@ import org.thingsboard.server.dao.model.sql.RpcEntity; import java.util.UUID; public interface RpcRepository extends CrudRepository { - Page findAllByDeviceIdAndStatus(UUID deviceId, RpcStatus status, Pageable pageable); + Page findAllByTenantIdAndDeviceIdAndStatus(UUID tenantId, UUID deviceId, RpcStatus status, Pageable pageable); + + Page findAllByTenantId(UUID tenantId, Pageable pageable); @Query(value = "WITH deleted AS (DELETE FROM rpc WHERE (tenant_id = :tenantId AND created_time < :expirationTime) IS TRUE RETURNING *) SELECT count(*) FROM deleted", nativeQuery = true) diff --git a/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java index c5050467dd..eab0a070f5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java @@ -37,6 +37,7 @@ import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.ota.OtaPackageService; import org.thingsboard.server.dao.resource.ResourceService; +import org.thingsboard.server.dao.rpc.RpcService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.PaginatedRemover; @@ -96,6 +97,9 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe @Autowired private OtaPackageService otaPackageService; + @Autowired + private RpcService rpcService; + @Override public Tenant findTenantById(TenantId tenantId) { log.trace("Executing findTenantById [{}]", tenantId); @@ -151,6 +155,7 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe apiUsageStateService.deleteApiUsageStateByTenantId(tenantId); resourceService.deleteResourcesByTenantId(tenantId); otaPackageService.deleteOtaPackagesByTenantId(tenantId); + rpcService.deleteAllRpcByTenantId(tenantId); tenantDao.removeById(tenantId, tenantId.getId()); deleteEntityRelations(tenantId, tenantId); } diff --git a/dao/src/main/resources/sql/schema-entities-idx.sql b/dao/src/main/resources/sql/schema-entities-idx.sql index 28bbe4311e..d649e44418 100644 --- a/dao/src/main/resources/sql/schema-entities-idx.sql +++ b/dao/src/main/resources/sql/schema-entities-idx.sql @@ -46,3 +46,4 @@ CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribu CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time); +CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java index 235bdcfa44..85c2a156b3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java @@ -33,7 +33,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; type = ComponentType.FILTER, name = "message type switch", configClazz = EmptyNodeConfiguration.class, - relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Sent", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Failed", + relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Failed", "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned", "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant", "Timeseries Updated", "Timeseries Deleted"}, @@ -97,8 +97,6 @@ public class TbMsgTypeSwitchNode implements TbNode { relationType = "Timeseries Deleted"; } else if (msg.getType().equals(DataConstants.RPC_QUEUED)) { relationType = "RPC Queued"; - } else if (msg.getType().equals(DataConstants.RPC_SENT)) { - relationType = "RPC Sent"; } else if (msg.getType().equals(DataConstants.RPC_DELIVERED)) { relationType = "RPC Delivered"; } else if (msg.getType().equals(DataConstants.RPC_SUCCESSFUL)) { diff --git a/ui-ngx/src/app/shared/models/rule-node.models.ts b/ui-ngx/src/app/shared/models/rule-node.models.ts index 49c3f18bf8..489c6eef10 100644 --- a/ui-ngx/src/app/shared/models/rule-node.models.ts +++ b/ui-ngx/src/app/shared/models/rule-node.models.ts @@ -354,9 +354,8 @@ export enum MessageType { TIMESERIES_UPDATED = 'TIMESERIES_UPDATED', TIMESERIES_DELETED = 'TIMESERIES_DELETED', RPC_QUEUED = 'RPC_QUEUED', - RPC_SENT = 'RPC_SENT', - RPC_DELIVERED = 'RPC_SENT', - RPC_SUCCESSFUL = 'RPC_DELIVERED', + RPC_DELIVERED = 'RPC_DELIVERED', + RPC_SUCCESSFUL = 'RPC_SUCCESSFUL', RPC_TIMEOUT = 'RPC_TIMEOUT', RPC_FAILED = 'RPC_FAILED' } @@ -381,7 +380,6 @@ export const messageTypeNames = new Map( [MessageType.TIMESERIES_UPDATED, 'Timeseries Updated'], [MessageType.TIMESERIES_DELETED, 'Timeseries Deleted'], [MessageType.RPC_QUEUED, 'RPC Queued'], - [MessageType.RPC_SENT, 'RPC Sent'], [MessageType.RPC_DELIVERED, 'RPC Delivered'], [MessageType.RPC_SUCCESSFUL, 'RPC Successful'], [MessageType.RPC_TIMEOUT, 'RPC Timeout'],