removed rpc status sent, added idx_rpc_tenant_id_device_id

This commit is contained in:
YevhenBondarenko 2021-06-14 15:06:21 +03:00
parent 6b546a459e
commit e58d5b2d8a
16 changed files with 147 additions and 62 deletions

View File

@ -209,3 +209,5 @@ CREATE TABLE IF NOT EXISTS rpc (
status varchar(255) NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id);

View File

@ -174,20 +174,15 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
long timeout = request.getExpirationTime() - System.currentTimeMillis();
boolean persisted = request.isPersisted();
if (timeout <= 0) {
log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime());
if (persisted) {
Rpc rpc = new Rpc(new RpcId(request.getId()));
rpc.setCreatedTime(System.currentTimeMillis());
rpc.setTenantId(tenantId);
rpc.setDeviceId(deviceId);
rpc.setExpirationTime(request.getExpirationTime());
rpc.setRequest(JacksonUtil.valueToTree(request));
rpc.setStatus(RpcStatus.TIMEOUT);
systemContext.getTbRpcService().save(tenantId, rpc);
createRpc(request, RpcStatus.TIMEOUT);
}
return;
} else if (persisted) {
createRpc(request, RpcStatus.QUEUED);
}
boolean sent;
@ -204,24 +199,14 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
syncSessionSet.add(key);
}
});
log.trace("46) Rpc syncSessionSet [{}] subscription after sent [{}]",syncSessionSet, rpcSubscriptions);
log.trace("46) Rpc syncSessionSet [{}] subscription after sent [{}]", syncSessionSet, rpcSubscriptions);
syncSessionSet.forEach(rpcSubscriptions::remove);
}
if (persisted) {
Rpc rpc = new Rpc(new RpcId(request.getId()));
rpc.setCreatedTime(System.currentTimeMillis());
rpc.setTenantId(tenantId);
rpc.setDeviceId(deviceId);
rpc.setExpirationTime(request.getExpirationTime());
rpc.setRequest(JacksonUtil.valueToTree(request));
rpc.setStatus(sent ? RpcStatus.SENT : RpcStatus.QUEUED);
systemContext.getTbRpcService().save(tenantId, rpc);
if (!(sent || request.isOneway())) {
ObjectNode response = JacksonUtil.newObjectNode();
response.put("rpcId", request.getId().toString());
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), JacksonUtil.toString(response), null));
}
if (persisted && !(sent || request.isOneway())) {
ObjectNode response = JacksonUtil.newObjectNode();
response.put("rpcId", request.getId().toString());
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), JacksonUtil.toString(response), null));
}
if (request.isOneway() && sent) {
@ -237,6 +222,18 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
}
}
private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) {
Rpc rpc = new Rpc(new RpcId(request.getId()));
rpc.setCreatedTime(System.currentTimeMillis());
rpc.setTenantId(tenantId);
rpc.setDeviceId(deviceId);
rpc.setExpirationTime(request.getExpirationTime());
rpc.setRequest(JacksonUtil.valueToTree(request));
rpc.setStatus(status);
systemContext.getTbRpcService().save(tenantId, rpc);
return systemContext.getTbRpcService().save(tenantId, rpc);
}
private ToDeviceRpcRequestMsg creteToDeviceRpcRequestMsg(ToDeviceRpcRequest request) {
ToDeviceRpcRequestBody body = request.getBody();
return ToDeviceRpcRequestMsg.newBuilder()
@ -318,9 +315,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
.setPersisted(request.isPersisted())
.build();
if (request.isPersisted()) {
systemContext.getTbRpcService().save(tenantId, new RpcId(request.getId()), RpcStatus.SENT, null);
}
sendToTransport(rpcRequest, sessionId, nodeId);
};
}
@ -821,10 +815,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
void init(TbActorCtx ctx) {
schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(), systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout());
PageLink pageLink = new PageLink(10);
PageLink pageLink = new PageLink(1024);
PageData<Rpc> pageData;
do {
pageData = systemContext.getTbRpcService().findAllByDeviceIdAndStatus(deviceId, RpcStatus.QUEUED, pageLink);
pageData = systemContext.getTbRpcService().findAllByDeviceIdAndStatus(tenantId, deviceId, RpcStatus.QUEUED, pageLink);
pageData.getData().forEach(rpc -> {
ToDeviceRpcRequest msg = JacksonUtil.convertValue(rpc.getRequest(), ToDeviceRpcRequest.class);
long timeout = rpc.getExpirationTime() - System.currentTimeMillis();

View File

@ -29,6 +29,7 @@ import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
@ -41,7 +42,10 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RpcId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.queue.util.TbCoreComponent;
@ -95,7 +99,7 @@ public class RpcController extends BaseController {
return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody);
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/persisted/{rpcId}", method = RequestMethod.GET)
@ResponseBody
public Rpc getPersistedRpc(@PathVariable("rpcId") String strRpc) throws ThingsboardException {
@ -108,6 +112,39 @@ public class RpcController extends BaseController {
}
}
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/persisted/{deviceId}", method = RequestMethod.GET)
@ResponseBody
public PageData<Rpc> getPersistedRpcByDevice(@PathVariable("deviceId") String strDeviceId,
@RequestParam int pageSize,
@RequestParam int page,
@RequestParam RpcStatus rpcStatus,
@RequestParam(required = false) String textSearch,
@RequestParam(required = false) String sortProperty,
@RequestParam(required = false) String sortOrder) throws ThingsboardException {
checkParameter("DeviceId", strDeviceId);
try {
TenantId tenantId = getCurrentUser().getTenantId();
PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder);
DeviceId deviceId = new DeviceId(UUID.fromString(strDeviceId));
return checkNotNull(rpcService.findAllByDeviceIdAndStatus(tenantId, deviceId, rpcStatus, pageLink));
} catch (Exception e) {
throw handleException(e);
}
}
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
@RequestMapping(value = "/persisted/{rpcId}", method = RequestMethod.DELETE)
@ResponseBody
public void deleteResource(@PathVariable("rpcId") String strRpc) throws ThingsboardException {
checkParameter("RpcId", strRpc);
try {
rpcService.deleteRpc(getTenantId(), new RpcId(UUID.fromString(strRpc)));
} catch (Exception e) {
throw handleException(e);
}
}
private DeferredResult<ResponseEntity> handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody) throws ThingsboardException {
try {
JsonNode rpcRequestBody = jsonMapper.readTree(requestBody);

View File

@ -54,7 +54,7 @@ public class RuleEngineEntityActionService {
private static final ObjectMapper json = new ObjectMapper();
public void pushEntityActionToRuleEngine(EntityId entityId, Object entity, TenantId tenantId, CustomerId customerId,
public void pushEntityActionToRuleEngine(EntityId entityId, HasName entity, TenantId tenantId, CustomerId customerId,
ActionType actionType, User user, Object... additionalInfo) {
String msgType = null;
switch (actionType) {

View File

@ -41,9 +41,10 @@ public class TbRpcService {
private final RpcService rpcService;
private final TbClusterService tbClusterService;
public void save(TenantId tenantId, Rpc rpc) {
Rpc saved = rpcService.save(tenantId, rpc);
public Rpc save(TenantId tenantId, Rpc rpc) {
Rpc saved = rpcService.save(rpc);
pushRpcMsgToRuleEngine(tenantId, saved);
return saved;
}
public void save(TenantId tenantId, RpcId rpcId, RpcStatus newStatus, JsonNode response) {
@ -53,7 +54,7 @@ public class TbRpcService {
if (response != null) {
foundRpc.setResponse(response);
}
Rpc saved = rpcService.save(tenantId, foundRpc);
Rpc saved = rpcService.save(foundRpc);
pushRpcMsgToRuleEngine(tenantId, saved);
} else {
log.warn("[{}] Failed to update RPC status because RPC was already deleted", rpcId);
@ -69,8 +70,8 @@ public class TbRpcService {
return rpcService.findById(tenantId, rpcId);
}
public PageData<Rpc> findAllByDeviceIdAndStatus(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) {
return rpcService.findAllByDeviceIdAndStatus(deviceId, rpcStatus, pageLink);
public PageData<Rpc> findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) {
return rpcService.findAllByDeviceIdAndStatus(tenantId, deviceId, rpcStatus, pageLink);
}
}

View File

@ -25,13 +25,15 @@ import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rpc.RpcStatus;
public interface RpcService {
Rpc save(TenantId tenantId, Rpc rpc);
Rpc save(Rpc rpc);
void remove(TenantId tenantId, RpcId id);
void deleteRpc(TenantId tenantId, RpcId id);
void deleteAllRpcByTenantId(TenantId tenantId);
Rpc findById(TenantId tenantId, RpcId id);
ListenableFuture<Rpc> findRpcByIdAsync(TenantId tenantId, RpcId id);
PageData<Rpc> findAllByDeviceIdAndStatus(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink);
PageData<Rpc> findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink);
}

View File

@ -77,7 +77,6 @@ public class DataConstants {
public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE";
public static final String RPC_QUEUED = "RPC_QUEUED";
public static final String RPC_SENT = "RPC_SENT";
public static final String RPC_DELIVERED = "RPC_DELIVERED";
public static final String RPC_SUCCESSFUL = "RPC_SUCCESSFUL";
public static final String RPC_TIMEOUT = "RPC_TIMEOUT";

View File

@ -16,5 +16,5 @@
package org.thingsboard.server.common.data.rpc;
public enum RpcStatus {
QUEUED, SENT, DELIVERED, SUCCESSFUL, TIMEOUT, FAILED
QUEUED, DELIVERED, SUCCESSFUL, TIMEOUT, FAILED
}

View File

@ -26,35 +26,75 @@ import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.dao.service.PaginatedRemover;
import static org.thingsboard.server.dao.service.Validator.validateId;
import static org.thingsboard.server.dao.service.Validator.validatePageLink;
@Service
@Slf4j
@RequiredArgsConstructor
public class BaseRpcService implements RpcService {
public static final String INCORRECT_TENANT_ID = "Incorrect tenantId ";
public static final String INCORRECT_RPC_ID = "Incorrect rpcId ";
private final RpcDao rpcDao;
@Override
public Rpc save(TenantId tenantId, Rpc rpc) {
return rpcDao.save(tenantId, rpc);
public Rpc save(Rpc rpc) {
log.trace("Executing save, [{}]", rpc);
return rpcDao.save(rpc.getTenantId(), rpc);
}
@Override
public void remove(TenantId tenantId, RpcId id) {
rpcDao.removeById(tenantId, id.getId());
public void deleteRpc(TenantId tenantId, RpcId rpcId) {
log.trace("Executing deleteRpc, tenantId [{}], rpcId [{}]", tenantId, rpcId);
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
validateId(rpcId, INCORRECT_RPC_ID + rpcId);
rpcDao.removeById(tenantId, rpcId.getId());
}
@Override
public Rpc findById(TenantId tenantId, RpcId id) {
return rpcDao.findById(tenantId, id.getId());
public void deleteAllRpcByTenantId(TenantId tenantId) {
log.trace("Executing deleteAllRpcByTenantId, tenantId [{}]", tenantId);
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
tenantRpcRemover.removeEntities(tenantId, tenantId);
}
@Override
public ListenableFuture<Rpc> findRpcByIdAsync(TenantId tenantId, RpcId id) {
return rpcDao.findByIdAsync(tenantId, id.getId());
public Rpc findById(TenantId tenantId, RpcId rpcId) {
log.trace("Executing findById, tenantId [{}], rpcId [{}]", tenantId, rpcId);
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
validateId(rpcId, INCORRECT_RPC_ID + rpcId);
return rpcDao.findById(tenantId, rpcId.getId());
}
@Override
public PageData<Rpc> findAllByDeviceIdAndStatus(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) {
return rpcDao.findAllByDeviceId(deviceId, rpcStatus, pageLink);
public ListenableFuture<Rpc> findRpcByIdAsync(TenantId tenantId, RpcId rpcId) {
log.trace("Executing findRpcByIdAsync, tenantId [{}], rpcId: [{}]", tenantId, rpcId);
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
validateId(rpcId, INCORRECT_RPC_ID + rpcId);
return rpcDao.findByIdAsync(tenantId, rpcId.getId());
}
@Override
public PageData<Rpc> findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) {
log.trace("Executing findAllByDeviceIdAndStatus, tenantId [{}], deviceId [{}], rpcStatus [{}], pageLink [{}]", tenantId, deviceId, rpcStatus, pageLink);
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
validatePageLink(pageLink);
return rpcDao.findAllByDeviceId(tenantId, deviceId, rpcStatus, pageLink);
}
private PaginatedRemover<TenantId, Rpc> tenantRpcRemover =
new PaginatedRemover<>() {
@Override
protected PageData<Rpc> findEntities(TenantId tenantId, TenantId id, PageLink pageLink) {
return rpcDao.findAllRpcByTenantId(id, pageLink);
}
@Override
protected void removeEntity(TenantId tenantId, Rpc entity) {
deleteRpc(tenantId, entity.getId());
}
};
}

View File

@ -24,7 +24,9 @@ import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.dao.Dao;
public interface RpcDao extends Dao<Rpc> {
PageData<Rpc> findAllByDeviceId(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink);
PageData<Rpc> findAllByDeviceId(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink);
PageData<Rpc> findAllRpcByTenantId(TenantId tenantId, PageLink pageLink);
Long deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime);
}

View File

@ -49,10 +49,14 @@ public class JpaRpcDao extends JpaAbstractDao<RpcEntity, Rpc> implements RpcDao
return rpcRepository;
}
@Override
public PageData<Rpc> findAllByDeviceId(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) {
return DaoUtil.toPageData(rpcRepository.findAllByTenantIdAndDeviceIdAndStatus(tenantId.getId(), deviceId.getId(), rpcStatus, DaoUtil.toPageable(pageLink)));
}
@Override
public PageData<Rpc> findAllByDeviceId(DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) {
return DaoUtil.toPageData(rpcRepository.findAllByDeviceIdAndStatus(deviceId.getId(), rpcStatus, DaoUtil.toPageable(pageLink)));
public PageData<Rpc> findAllRpcByTenantId(TenantId tenantId, PageLink pageLink) {
return DaoUtil.toPageData(rpcRepository.findAllByTenantId(tenantId.getId(), DaoUtil.toPageable(pageLink)));
}
@Override

View File

@ -26,7 +26,9 @@ import org.thingsboard.server.dao.model.sql.RpcEntity;
import java.util.UUID;
public interface RpcRepository extends CrudRepository<RpcEntity, UUID> {
Page<RpcEntity> findAllByDeviceIdAndStatus(UUID deviceId, RpcStatus status, Pageable pageable);
Page<RpcEntity> findAllByTenantIdAndDeviceIdAndStatus(UUID tenantId, UUID deviceId, RpcStatus status, Pageable pageable);
Page<RpcEntity> findAllByTenantId(UUID tenantId, Pageable pageable);
@Query(value = "WITH deleted AS (DELETE FROM rpc WHERE (tenant_id = :tenantId AND created_time < :expirationTime) IS TRUE RETURNING *) SELECT count(*) FROM deleted",
nativeQuery = true)

View File

@ -37,6 +37,7 @@ import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.ota.OtaPackageService;
import org.thingsboard.server.dao.resource.ResourceService;
import org.thingsboard.server.dao.rpc.RpcService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.service.PaginatedRemover;
@ -96,6 +97,9 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe
@Autowired
private OtaPackageService otaPackageService;
@Autowired
private RpcService rpcService;
@Override
public Tenant findTenantById(TenantId tenantId) {
log.trace("Executing findTenantById [{}]", tenantId);
@ -151,6 +155,7 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe
apiUsageStateService.deleteApiUsageStateByTenantId(tenantId);
resourceService.deleteResourcesByTenantId(tenantId);
otaPackageService.deleteOtaPackagesByTenantId(tenantId);
rpcService.deleteAllRpcByTenantId(tenantId);
tenantDao.removeById(tenantId, tenantId.getId());
deleteEntityRelations(tenantId, tenantId);
}

View File

@ -46,3 +46,4 @@ CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribu
CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time);
CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id);

View File

@ -33,7 +33,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
type = ComponentType.FILTER,
name = "message type switch",
configClazz = EmptyNodeConfiguration.class,
relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Sent", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Failed",
relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Failed",
"Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned",
"Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant",
"Timeseries Updated", "Timeseries Deleted"},
@ -97,8 +97,6 @@ public class TbMsgTypeSwitchNode implements TbNode {
relationType = "Timeseries Deleted";
} else if (msg.getType().equals(DataConstants.RPC_QUEUED)) {
relationType = "RPC Queued";
} else if (msg.getType().equals(DataConstants.RPC_SENT)) {
relationType = "RPC Sent";
} else if (msg.getType().equals(DataConstants.RPC_DELIVERED)) {
relationType = "RPC Delivered";
} else if (msg.getType().equals(DataConstants.RPC_SUCCESSFUL)) {

View File

@ -354,9 +354,8 @@ export enum MessageType {
TIMESERIES_UPDATED = 'TIMESERIES_UPDATED',
TIMESERIES_DELETED = 'TIMESERIES_DELETED',
RPC_QUEUED = 'RPC_QUEUED',
RPC_SENT = 'RPC_SENT',
RPC_DELIVERED = 'RPC_SENT',
RPC_SUCCESSFUL = 'RPC_DELIVERED',
RPC_DELIVERED = 'RPC_DELIVERED',
RPC_SUCCESSFUL = 'RPC_SUCCESSFUL',
RPC_TIMEOUT = 'RPC_TIMEOUT',
RPC_FAILED = 'RPC_FAILED'
}
@ -381,7 +380,6 @@ export const messageTypeNames = new Map<MessageType, string>(
[MessageType.TIMESERIES_UPDATED, 'Timeseries Updated'],
[MessageType.TIMESERIES_DELETED, 'Timeseries Deleted'],
[MessageType.RPC_QUEUED, 'RPC Queued'],
[MessageType.RPC_SENT, 'RPC Sent'],
[MessageType.RPC_DELIVERED, 'RPC Delivered'],
[MessageType.RPC_SUCCESSFUL, 'RPC Successful'],
[MessageType.RPC_TIMEOUT, 'RPC Timeout'],