diff --git a/application/src/main/data/upgrade/3.2.2/schema_update.sql b/application/src/main/data/upgrade/3.2.2/schema_update.sql index 8020046a7b..bda87bbb6b 100644 --- a/application/src/main/data/upgrade/3.2.2/schema_update.sql +++ b/application/src/main/data/upgrade/3.2.2/schema_update.sql @@ -197,3 +197,17 @@ $$; ALTER TABLE api_usage_state ADD COLUMN IF NOT EXISTS alarm_exec VARCHAR(32); UPDATE api_usage_state SET alarm_exec = 'ENABLED' WHERE alarm_exec IS NULL; + +CREATE TABLE IF NOT EXISTS rpc ( + id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY, + created_time bigint NOT NULL, + tenant_id uuid NOT NULL, + device_id uuid NOT NULL, + expiration_time bigint NOT NULL, + request varchar(10000000) NOT NULL, + response varchar(10000000), + status varchar(255) NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id); + diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index b52f85af92..5f97f3151f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -65,6 +65,7 @@ import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.resource.ResourceService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.rule.RuleNodeStateService; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; @@ -80,9 +81,9 @@ import org.thingsboard.server.service.executors.ExternalCallExecutorService; import org.thingsboard.server.service.executors.SharedEventLoopGroupService; import org.thingsboard.server.service.mail.MailExecutorService; import org.thingsboard.server.service.profile.TbDeviceProfileCache; -import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.service.queue.TbClusterService; import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; +import org.thingsboard.server.service.rpc.TbRpcService; import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService; import org.thingsboard.server.service.script.JsExecutorService; import org.thingsboard.server.service.script.JsInvokeService; @@ -303,23 +304,33 @@ public class ActorSystemContext { @Lazy @Autowired(required = false) - @Getter private EdgeService edgeService; + @Getter + private EdgeService edgeService; @Lazy @Autowired(required = false) - @Getter private EdgeEventService edgeEventService; + @Getter + private EdgeEventService edgeEventService; @Lazy @Autowired(required = false) - @Getter private EdgeRpcService edgeRpcService; + @Getter + private EdgeRpcService edgeRpcService; @Lazy @Autowired(required = false) - @Getter private ResourceService resourceService; + @Getter + private ResourceService resourceService; @Lazy @Autowired(required = false) - @Getter private OtaPackageService otaPackageService; + @Getter + private OtaPackageService otaPackageService; + + @Lazy + @Autowired(required = false) + @Getter + private TbRpcService tbRpcService; @Value("${actors.session.max_concurrent_sessions_per_device:1}") @Getter diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java index 6e2761c9c5..3e06b8387c 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java @@ -46,7 +46,7 @@ public class DeviceActor extends ContextAwareActor { super.init(ctx); log.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId); try { - processor.initSessionTimeout(ctx); + processor.init(ctx); log.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId); } catch (Exception e) { log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.deviceId, e); diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 747a16a5e6..2d5f107d61 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.InvalidProtocolBufferException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.api.RpcError; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg; @@ -38,12 +39,17 @@ import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.RpcId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKey; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; +import org.thingsboard.server.common.data.rpc.Rpc; +import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentialsType; @@ -52,8 +58,8 @@ import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; @@ -68,10 +74,12 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType; import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto; +import org.thingsboard.server.gen.transport.TransportProtos.ToDevicePersistedRpcResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; @@ -162,20 +170,19 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { void processRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg) { ToDeviceRpcRequest request = msg.getMsg(); - ToDeviceRpcRequestBody body = request.getBody(); - ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder() - .setRequestId(rpcSeq++) - .setMethodName(body.getMethod()) - .setParams(body.getParams()) - .setExpirationTime(request.getExpirationTime()) - .setRequestIdMSB(request.getId().getMostSignificantBits()) - .setRequestIdLSB(request.getId().getLeastSignificantBits()) - .build(); + ToDeviceRpcRequestMsg rpcRequest = creteToDeviceRpcRequestMsg(request); long timeout = request.getExpirationTime() - System.currentTimeMillis(); + boolean persisted = request.isPersisted(); + if (timeout <= 0) { log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime()); + if (persisted) { + createRpc(request, RpcStatus.TIMEOUT); + } return; + } else if (persisted) { + createRpc(request, RpcStatus.QUEUED); } boolean sent; @@ -192,10 +199,16 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { syncSessionSet.add(key); } }); - log.trace("46) Rpc syncSessionSet [{}] subscription after sent [{}]",syncSessionSet, rpcSubscriptions); + log.trace("46) Rpc syncSessionSet [{}] subscription after sent [{}]", syncSessionSet, rpcSubscriptions); syncSessionSet.forEach(rpcSubscriptions::remove); } + if (persisted && !(sent || request.isOneway())) { + ObjectNode response = JacksonUtil.newObjectNode(); + response.put("rpcId", request.getId().toString()); + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), JacksonUtil.toString(response), null)); + } + if (request.isOneway() && sent) { log.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId()); systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null)); @@ -209,6 +222,32 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } } + private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) { + Rpc rpc = new Rpc(new RpcId(request.getId())); + rpc.setCreatedTime(System.currentTimeMillis()); + rpc.setTenantId(tenantId); + rpc.setDeviceId(deviceId); + rpc.setExpirationTime(request.getExpirationTime()); + rpc.setRequest(JacksonUtil.valueToTree(request)); + rpc.setStatus(status); + systemContext.getTbRpcService().save(tenantId, rpc); + return systemContext.getTbRpcService().save(tenantId, rpc); + } + + private ToDeviceRpcRequestMsg creteToDeviceRpcRequestMsg(ToDeviceRpcRequest request) { + ToDeviceRpcRequestBody body = request.getBody(); + return ToDeviceRpcRequestMsg.newBuilder() + .setRequestId(rpcSeq++) + .setMethodName(body.getMethod()) + .setParams(body.getParams()) + .setExpirationTime(request.getExpirationTime()) + .setRequestIdMSB(request.getId().getMostSignificantBits()) + .setRequestIdLSB(request.getId().getLeastSignificantBits()) + .setOneway(request.isOneway()) + .setPersisted(request.isPersisted()) + .build(); + } + void processRpcResponsesFromEdge(TbActorCtx context, FromDeviceRpcResponseActorMsg responseMsg) { log.debug("[{}] Processing rpc command response from edge session", deviceId); ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); @@ -230,6 +269,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId()); if (requestMd != null) { log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId()); + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.TIMEOUT, null); systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); } @@ -271,7 +311,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { .setExpirationTime(request.getExpirationTime()) .setRequestIdMSB(request.getId().getMostSignificantBits()) .setRequestIdLSB(request.getId().getLeastSignificantBits()) + .setOneway(request.isOneway()) + .setPersisted(request.isPersisted()) .build(); + sendToTransport(rpcRequest, sessionId, nodeId); }; } @@ -279,31 +322,39 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { void process(TbActorCtx context, TransportToDeviceActorMsgWrapper wrapper) { TransportToDeviceActorMsg msg = wrapper.getMsg(); TbCallback callback = wrapper.getCallback(); + var sessionInfo = msg.getSessionInfo(); + if (msg.hasSessionEvent()) { - processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent()); + processSessionStateMsgs(sessionInfo, msg.getSessionEvent()); } if (msg.hasSubscribeToAttributes()) { - processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToAttributes()); + processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToAttributes()); } if (msg.hasSubscribeToRPC()) { - processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToRPC()); + processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToRPC()); + } + if (msg.hasSendPendingRPC()) { + sendPendingRequests(context, getSessionId(sessionInfo), sessionInfo); } if (msg.hasGetAttributes()) { - handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes()); + handleGetAttributesRequest(context, sessionInfo, msg.getGetAttributes()); } if (msg.hasToDeviceRPCCallResponse()) { - processRpcResponses(context, msg.getSessionInfo(), msg.getToDeviceRPCCallResponse()); + processRpcResponses(context, sessionInfo, msg.getToDeviceRPCCallResponse()); } if (msg.hasSubscriptionInfo()) { - handleSessionActivity(context, msg.getSessionInfo(), msg.getSubscriptionInfo()); + handleSessionActivity(context, sessionInfo, msg.getSubscriptionInfo()); } if (msg.hasClaimDevice()) { - handleClaimDeviceMsg(context, msg.getSessionInfo(), msg.getClaimDevice()); + handleClaimDeviceMsg(context, sessionInfo, msg.getClaimDevice()); + } + if (msg.hasPersistedRpcResponseMsg()) { + processPersistedRpcResponses(context, sessionInfo, msg.getPersistedRpcResponseMsg()); } callback.onSuccess(); } - private void handleClaimDeviceMsg(TbActorCtx context, SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg) { + private void handleClaimDeviceMsg(TbActorCtx context, SessionInfoProto sessionInfo, ClaimDeviceMsg msg) { DeviceId deviceId = new DeviceId(new UUID(msg.getDeviceIdMSB(), msg.getDeviceIdLSB())); systemContext.getClaimDevicesService().registerClaimingInfo(tenantId, deviceId, msg.getSecretKey(), msg.getDurationMs()); } @@ -442,11 +493,22 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { if (success) { systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), responseMsg.getPayload(), null)); + if (requestMd.getMsg().getMsg().isPersisted()) { + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.SUCCESSFUL, JacksonUtil.toJsonNode(responseMsg.getPayload())); + } } else { log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId()); + if (requestMd.getMsg().getMsg().isPersisted()) { + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.FAILED, JacksonUtil.toJsonNode(responseMsg.getPayload())); + } } } + private void processPersistedRpcResponses(TbActorCtx context, SessionInfoProto sessionInfo, ToDevicePersistedRpcResponseMsg responseMsg) { + UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB()); + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), RpcStatus.valueOf(responseMsg.getStatus()), null); + } + private void processSubscriptionCommands(TbActorCtx context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) { UUID sessionId = getSessionId(sessionInfo); if (subscribeCmd.getUnsubscribe()) { @@ -565,7 +627,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { void notifyTransportAboutProfileUpdate(UUID sessionId, SessionInfoMetaData sessionMd, DeviceCredentials deviceCredentials) { log.info("2) LwM2Mtype: "); - TransportProtos.ToTransportUpdateCredentialsProto.Builder notification = TransportProtos.ToTransportUpdateCredentialsProto.newBuilder(); + ToTransportUpdateCredentialsProto.Builder notification = ToTransportUpdateCredentialsProto.newBuilder(); notification.addCredentialsId(deviceCredentials.getCredentialsId()); notification.addCredentialsValue(deviceCredentials.getCredentialsValue()); ToTransportMsg msg = ToTransportMsg.newBuilder() @@ -640,7 +702,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { ListenableFuture future = systemContext.getEdgeEventService().saveAsync(edgeEvent); Futures.addCallback(future, new FutureCallback() { @Override - public void onSuccess( EdgeEvent result) { + public void onSuccess(EdgeEvent result) { systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId); } @@ -756,8 +818,26 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { .addAllSessions(sessionsList).build().toByteArray()); } - void initSessionTimeout(TbActorCtx ctx) { + void init(TbActorCtx ctx) { schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(), systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout()); + PageLink pageLink = new PageLink(1024); + PageData pageData; + do { + pageData = systemContext.getTbRpcService().findAllByDeviceIdAndStatus(tenantId, deviceId, RpcStatus.QUEUED, pageLink); + pageData.getData().forEach(rpc -> { + ToDeviceRpcRequest msg = JacksonUtil.convertValue(rpc.getRequest(), ToDeviceRpcRequest.class); + long timeout = rpc.getExpirationTime() - System.currentTimeMillis(); + if (timeout <= 0) { + rpc.setStatus(RpcStatus.TIMEOUT); + systemContext.getTbRpcService().save(tenantId, rpc); + } else { + registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, creteToDeviceRpcRequestMsg(msg), timeout); + } + }); + if (pageData.hasNext()) { + pageLink = pageLink.nextPageLink(); + } + } while (pageData.hasNext()); } void checkSessionsTimeout() { diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index 7d41deb2ab..a731e806c1 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -69,6 +69,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.OtaPackageId; +import org.thingsboard.server.common.data.id.RpcId; import org.thingsboard.server.common.data.id.TbResourceId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; @@ -83,6 +84,7 @@ import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.plugin.ComponentDescriptor; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.data.rpc.Rpc; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.common.data.rule.RuleNode; @@ -106,6 +108,7 @@ import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.oauth2.OAuth2ConfigTemplateService; import org.thingsboard.server.dao.oauth2.OAuth2Service; import org.thingsboard.server.dao.relation.RelationService; +import org.thingsboard.server.dao.rpc.RpcService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantProfileService; @@ -245,6 +248,9 @@ public abstract class BaseController { @Autowired protected OtaPackageStateService otaPackageStateService; + @Autowired + protected RpcService rpcService; + @Autowired protected TbQueueProducerProvider producerProvider; @@ -786,6 +792,18 @@ public abstract class BaseController { } } + Rpc checkRpcId(RpcId rpcId, Operation operation) throws ThingsboardException { + try { + validateId(rpcId, "Incorrect rpcId " + rpcId); + Rpc rpc = rpcService.findById(getCurrentUser().getTenantId(), rpcId); + checkNotNull(rpc); + accessControlService.checkPermission(getCurrentUser(), Resource.RPC, operation, rpcId, rpc); + return rpc; + } catch (Exception e) { + throw handleException(e, false); + } + } + @SuppressWarnings("unchecked") protected I emptyId(EntityType entityType) { return (I) EntityIdFactory.getByTypeAndUuid(entityType, ModelConstants.NULL_UUID); diff --git a/application/src/main/java/org/thingsboard/server/controller/RpcController.java b/application/src/main/java/org/thingsboard/server/controller/RpcController.java index e71ffbc6d2..eae26542fa 100644 --- a/application/src/main/java/org/thingsboard/server/controller/RpcController.java +++ b/application/src/main/java/org/thingsboard/server/controller/RpcController.java @@ -29,6 +29,7 @@ import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; @@ -38,8 +39,13 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.RpcId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UUIDBased; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.rpc.Rpc; +import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.queue.util.TbCoreComponent; @@ -93,6 +99,52 @@ public class RpcController extends BaseController { return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody); } + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") + @RequestMapping(value = "/persisted/{rpcId}", method = RequestMethod.GET) + @ResponseBody + public Rpc getPersistedRpc(@PathVariable("rpcId") String strRpc) throws ThingsboardException { + checkParameter("RpcId", strRpc); + try { + RpcId rpcId = new RpcId(UUID.fromString(strRpc)); + return checkRpcId(rpcId, Operation.READ); + } catch (Exception e) { + throw handleException(e); + } + } + + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") + @RequestMapping(value = "/persisted/{deviceId}", method = RequestMethod.GET) + @ResponseBody + public PageData getPersistedRpcByDevice(@PathVariable("deviceId") String strDeviceId, + @RequestParam int pageSize, + @RequestParam int page, + @RequestParam RpcStatus rpcStatus, + @RequestParam(required = false) String textSearch, + @RequestParam(required = false) String sortProperty, + @RequestParam(required = false) String sortOrder) throws ThingsboardException { + checkParameter("DeviceId", strDeviceId); + try { + TenantId tenantId = getCurrentUser().getTenantId(); + PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder); + DeviceId deviceId = new DeviceId(UUID.fromString(strDeviceId)); + return checkNotNull(rpcService.findAllByDeviceIdAndStatus(tenantId, deviceId, rpcStatus, pageLink)); + } catch (Exception e) { + throw handleException(e); + } + } + + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") + @RequestMapping(value = "/persisted/{rpcId}", method = RequestMethod.DELETE) + @ResponseBody + public void deleteResource(@PathVariable("rpcId") String strRpc) throws ThingsboardException { + checkParameter("RpcId", strRpc); + try { + rpcService.deleteRpc(getTenantId(), new RpcId(UUID.fromString(strRpc))); + } catch (Exception e) { + throw handleException(e); + } + } + private DeferredResult handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody) throws ThingsboardException { try { JsonNode rpcRequestBody = jsonMapper.readTree(requestBody); @@ -103,6 +155,7 @@ public class RpcController extends BaseController { long timeout = rpcRequestBody.has("timeout") ? rpcRequestBody.get("timeout").asLong() : defaultTimeout; long expTime = System.currentTimeMillis() + Math.max(minTimeout, timeout); UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID(); + boolean persisted = rpcRequestBody.has("persisted") && rpcRequestBody.get("persisted").asBoolean(); accessValidator.validate(currentUser, Operation.RPC_CALL, deviceId, new HttpValidationCallback(response, new FutureCallback>() { @Override public void onSuccess(@Nullable DeferredResult result) { @@ -111,7 +164,8 @@ public class RpcController extends BaseController { deviceId, oneWay, expTime, - body + body, + persisted ); deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse), currentUser); } diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java index 85c26d4c4d..a69402201a 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java @@ -157,6 +157,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { metaData.putValue("originServiceId", serviceId); metaData.putValue("expirationTime", Long.toString(msg.getExpirationTime())); metaData.putValue("oneway", Boolean.toString(msg.isOneway())); + metaData.putValue("persisted", Boolean.toString(msg.isPersisted())); Device device = deviceService.findDeviceById(msg.getTenantId(), msg.getDeviceId()); if (device != null) { diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java index c3b038c35a..e16af3695b 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java @@ -100,7 +100,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi @Override public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer consumer) { ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(), - src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody())); + src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted()); forwardRpcRequestToDeviceActor(request, response -> { if (src.isRestApiCall()) { sendRpcResponseToTbCore(src.getOriginServiceId(), response); diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/TbRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/TbRpcService.java new file mode 100644 index 0000000000..e0343485db --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/rpc/TbRpcService.java @@ -0,0 +1,77 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.rpc; + +import com.fasterxml.jackson.databind.JsonNode; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.RpcId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.rpc.Rpc; +import org.thingsboard.server.common.data.rpc.RpcStatus; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.rpc.RpcService; +import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.queue.TbClusterService; + +@TbCoreComponent +@Service +@RequiredArgsConstructor +@Slf4j +public class TbRpcService { + private final RpcService rpcService; + private final TbClusterService tbClusterService; + + public Rpc save(TenantId tenantId, Rpc rpc) { + Rpc saved = rpcService.save(rpc); + pushRpcMsgToRuleEngine(tenantId, saved); + return saved; + } + + public void save(TenantId tenantId, RpcId rpcId, RpcStatus newStatus, JsonNode response) { + Rpc foundRpc = rpcService.findById(tenantId, rpcId); + if (foundRpc != null) { + foundRpc.setStatus(newStatus); + if (response != null) { + foundRpc.setResponse(response); + } + Rpc saved = rpcService.save(foundRpc); + pushRpcMsgToRuleEngine(tenantId, saved); + } else { + log.warn("[{}] Failed to update RPC status because RPC was already deleted", rpcId); + } + } + + private void pushRpcMsgToRuleEngine(TenantId tenantId, Rpc rpc) { + TbMsg msg = TbMsg.newMsg("RPC_" + rpc.getStatus().name(), rpc.getDeviceId(), TbMsgMetaData.EMPTY, JacksonUtil.toString(rpc)); + tbClusterService.pushMsgToRuleEngine(tenantId, rpc.getId(), msg, null); + } + + public Rpc findRpcById(TenantId tenantId, RpcId rpcId) { + return rpcService.findById(tenantId, rpcId); + } + + public PageData findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) { + return rpcService.findAllByDeviceIdAndStatus(tenantId, deviceId, rpcStatus, pageLink); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java index 540a47e8b0..ab9d0de231 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java +++ b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java @@ -47,11 +47,13 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.OtaPackageId; +import org.thingsboard.server.common.data.id.RpcId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TbResourceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.rpc.Rpc; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.controller.HttpValidationCallback; @@ -65,6 +67,7 @@ import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.ota.OtaPackageService; import org.thingsboard.server.dao.resource.ResourceService; +import org.thingsboard.server.dao.rpc.RpcService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.usagerecord.ApiUsageStateService; @@ -137,6 +140,9 @@ public class AccessValidator { @Autowired protected OtaPackageService otaPackageService; + @Autowired + protected RpcService rpcService; + private ExecutorService executor; @PostConstruct @@ -235,6 +241,9 @@ public class AccessValidator { case OTA_PACKAGE: validateOtaPackage(currentUser, operation, entityId, callback); return; + case RPC: + validateRpc(currentUser, operation, entityId, callback); + return; default: //TODO: add support of other entities throw new IllegalStateException("Not Implemented!"); @@ -261,6 +270,22 @@ public class AccessValidator { } } + private void validateRpc(final SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback callback) { + ListenableFuture rpcFurure = rpcService.findRpcByIdAsync(currentUser.getTenantId(), new RpcId(entityId.getId())); + Futures.addCallback(rpcFurure, getCallback(callback, rpc -> { + if (rpc == null) { + return ValidationResult.entityNotFound("Rpc with requested id wasn't found!"); + } else { + try { + accessControlService.checkPermission(currentUser, Resource.RPC, operation, entityId, rpc); + } catch (ThingsboardException e) { + return ValidationResult.accessDenied(e.getMessage()); + } + return ValidationResult.ok(rpc); + } + }), executor); + } + private void validateDeviceProfile(final SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback callback) { if (currentUser.isSystemAdmin()) { callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION)); diff --git a/application/src/main/java/org/thingsboard/server/service/security/permission/CustomerUserPermissions.java b/application/src/main/java/org/thingsboard/server/service/security/permission/CustomerUserPermissions.java index fc1705d688..b0c39a72d2 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/permission/CustomerUserPermissions.java +++ b/application/src/main/java/org/thingsboard/server/service/security/permission/CustomerUserPermissions.java @@ -41,6 +41,7 @@ public class CustomerUserPermissions extends AbstractPermissions { put(Resource.WIDGETS_BUNDLE, widgetsPermissionChecker); put(Resource.WIDGET_TYPE, widgetsPermissionChecker); put(Resource.EDGE, customerEntityPermissionChecker); + put(Resource.RPC, rpcPermissionChecker); } private static final PermissionChecker customerEntityPermissionChecker = @@ -138,4 +139,22 @@ public class CustomerUserPermissions extends AbstractPermissions { } }; + + private static final PermissionChecker rpcPermissionChecker = new PermissionChecker.GenericPermissionChecker(Operation.READ) { + + @Override + @SuppressWarnings("unchecked") + public boolean hasPermission(SecurityUser user, Operation operation, EntityId entityId, HasTenantId entity) { + if (!super.hasPermission(user, operation, entityId, entity)) { + return false; + } + if (entity.getTenantId() == null || entity.getTenantId().isNullUid()) { + return true; + } + if (!user.getTenantId().equals(entity.getTenantId())) { + return false; + } + return true; + } + }; } diff --git a/application/src/main/java/org/thingsboard/server/service/security/permission/Resource.java b/application/src/main/java/org/thingsboard/server/service/security/permission/Resource.java index 43c420a94a..c2890836b4 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/permission/Resource.java +++ b/application/src/main/java/org/thingsboard/server/service/security/permission/Resource.java @@ -39,7 +39,8 @@ public enum Resource { API_USAGE_STATE(EntityType.API_USAGE_STATE), TB_RESOURCE(EntityType.TB_RESOURCE), OTA_PACKAGE(EntityType.OTA_PACKAGE), - EDGE(EntityType.EDGE); + EDGE(EntityType.EDGE), + RPC(EntityType.RPC); private final EntityType entityType; diff --git a/application/src/main/java/org/thingsboard/server/service/security/permission/TenantAdminPermissions.java b/application/src/main/java/org/thingsboard/server/service/security/permission/TenantAdminPermissions.java index 8b4d44e938..ad753898e9 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/permission/TenantAdminPermissions.java +++ b/application/src/main/java/org/thingsboard/server/service/security/permission/TenantAdminPermissions.java @@ -44,6 +44,7 @@ public class TenantAdminPermissions extends AbstractPermissions { put(Resource.TB_RESOURCE, tbResourcePermissionChecker); put(Resource.OTA_PACKAGE, tenantEntityPermissionChecker); put(Resource.EDGE, tenantEntityPermissionChecker); + put(Resource.RPC, tenantEntityPermissionChecker); } public static final PermissionChecker tenantEntityPermissionChecker = new PermissionChecker() { diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index 980c3d2c10..c09d81bd45 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; @@ -41,13 +42,13 @@ import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.device.credentials.BasicMqttCredentials; import org.thingsboard.server.common.data.device.credentials.ProvisionDeviceCredentialsData; import org.thingsboard.server.common.data.device.profile.ProvisionDeviceProfileCredentials; -import org.thingsboard.server.common.data.ota.OtaPackageType; -import org.thingsboard.server.common.data.ota.OtaPackageUtil; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.OtaPackageId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.ota.OtaPackageType; +import org.thingsboard.server.common.data.ota.OtaPackageUtil; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; @@ -108,6 +109,7 @@ import java.util.stream.Collectors; @Slf4j @Service @TbCoreComponent +@RequiredArgsConstructor public class DefaultTransportApiService implements TransportApiService { private static final ObjectMapper mapper = new ObjectMapper(); @@ -129,28 +131,6 @@ public class DefaultTransportApiService implements TransportApiService { private final ConcurrentMap deviceCreationLocks = new ConcurrentHashMap<>(); - public DefaultTransportApiService(TbDeviceProfileCache deviceProfileCache, - TbTenantProfileCache tenantProfileCache, TbApiUsageStateService apiUsageStateService, DeviceService deviceService, - RelationService relationService, DeviceCredentialsService deviceCredentialsService, - DeviceStateService deviceStateService, DbCallbackExecutorService dbCallbackExecutorService, - TbClusterService tbClusterService, DataDecodingEncodingService dataDecodingEncodingService, - DeviceProvisionService deviceProvisionService, TbResourceService resourceService, OtaPackageService otaPackageService, OtaPackageDataCache otaPackageDataCache) { - this.deviceProfileCache = deviceProfileCache; - this.tenantProfileCache = tenantProfileCache; - this.apiUsageStateService = apiUsageStateService; - this.deviceService = deviceService; - this.relationService = relationService; - this.deviceCredentialsService = deviceCredentialsService; - this.deviceStateService = deviceStateService; - this.dbCallbackExecutorService = dbCallbackExecutorService; - this.tbClusterService = tbClusterService; - this.dataDecodingEncodingService = dataDecodingEncodingService; - this.deviceProvisionService = deviceProvisionService; - this.resourceService = resourceService; - this.otaPackageService = otaPackageService; - this.otaPackageDataCache = otaPackageDataCache; - } - @Override public ListenableFuture> handle(TbProtoQueueMsg tbProtoQueueMsg) { TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue(); diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/rpc/RpcCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/rpc/RpcCleanUpService.java new file mode 100644 index 0000000000..c0985eb4c1 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/ttl/rpc/RpcCleanUpService.java @@ -0,0 +1,83 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ttl.rpc; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.dao.rpc.RpcDao; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; +import org.thingsboard.server.dao.tenant.TenantDao; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import java.util.Date; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +@TbCoreComponent +@Service +@Slf4j +@RequiredArgsConstructor +public class RpcCleanUpService { + @Value("${sql.ttl.rpc.enabled}") + private boolean ttlTaskExecutionEnabled; + + private final TenantDao tenantDao; + private final PartitionService partitionService; + private final TbTenantProfileCache tenantProfileCache; + private final RpcDao rpcDao; + + @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.rpc.checking_interval})}", fixedDelayString = "${sql.ttl.rpc.checking_interval}") + public void cleanUp() { + if (ttlTaskExecutionEnabled) { + PageLink tenantsBatchRequest = new PageLink(10_000, 0); + PageData tenantsIds; + do { + tenantsIds = tenantDao.findTenantsIds(tenantsBatchRequest); + for (TenantId tenantId : tenantsIds.getData()) { + if (!partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) { + continue; + } + + Optional tenantProfileConfiguration = tenantProfileCache.get(tenantId).getProfileConfiguration(); + if (tenantProfileConfiguration.isEmpty() || tenantProfileConfiguration.get().getRpcTtlDays() == 0) { + continue; + } + + long ttl = TimeUnit.DAYS.toMillis(tenantProfileConfiguration.get().getRpcTtlDays()); + long expirationTime = System.currentTimeMillis() - ttl; + + long totalRemoved = rpcDao.deleteOutdatedRpcByTenantId(tenantId, expirationTime); + + if (totalRemoved > 0) { + log.info("Removed {} outdated rpc(s) for tenant {} older than {}", totalRemoved, tenantId, new Date(expirationTime)); + } + } + + tenantsBatchRequest = tenantsBatchRequest.nextPageLink(); + } while (tenantsIds.hasNext()); + } + } + +} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 69258a644f..79e8d9e1d3 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -276,6 +276,9 @@ sql: alarms: checking_interval: "${SQL_ALARMS_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:3000}" # To delete outdated alarms not all at once but in batches + rpc: + enabled: "${SQL_TTL_RPC_ENABLED:true}" + checking_interval: "${SQL_RPC_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours # Actor system parameters actors: diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/X509LwM2MIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/X509LwM2MIntegrationTest.java index 661f7c5474..7b60414e9c 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/X509LwM2MIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/X509LwM2MIntegrationTest.java @@ -75,13 +75,11 @@ public class X509LwM2MIntegrationTest extends AbstractLwM2MIntegrationTest { return device; } - //TODO: use different endpoints to isolate tests. - @Ignore() @Test public void testConnectAndObserveTelemetry() throws Exception { createDeviceProfile(TRANSPORT_CONFIGURATION); X509ClientCredentials credentials = new X509ClientCredentials(); - credentials.setEndpoint(endpoint+1); + credentials.setEndpoint(endpoint); Device device = createDevice(credentials); SingleEntityFilter sef = new SingleEntityFilter(); @@ -99,7 +97,7 @@ public class X509LwM2MIntegrationTest extends AbstractLwM2MIntegrationTest { wsClient.waitForReply(); wsClient.registerWaitForUpdate(); - LwM2MTestClient client = new LwM2MTestClient(executor, endpoint+1); + LwM2MTestClient client = new LwM2MTestClient(executor, endpoint); Security security = x509(serverUri, 123, clientX509Cert.getEncoded(), clientPrivateKeyFromCert.getEncoded(), serverX509Cert.getEncoded()); client.init(security, coapConfig); String msg = wsClient.waitForUpdate(); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/rpc/RpcService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/rpc/RpcService.java new file mode 100644 index 0000000000..4bdb1a169d --- /dev/null +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/rpc/RpcService.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.rpc; + +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.RpcId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.rpc.Rpc; +import org.thingsboard.server.common.data.rpc.RpcStatus; + +public interface RpcService { + Rpc save(Rpc rpc); + + void deleteRpc(TenantId tenantId, RpcId id); + + void deleteAllRpcByTenantId(TenantId tenantId); + + Rpc findById(TenantId tenantId, RpcId id); + + ListenableFuture findRpcByIdAsync(TenantId tenantId, RpcId id); + + PageData findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink); +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 002cbbb733..efc81846d5 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -76,6 +76,12 @@ public class DataConstants { public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE"; + public static final String RPC_QUEUED = "RPC_QUEUED"; + public static final String RPC_DELIVERED = "RPC_DELIVERED"; + public static final String RPC_SUCCESSFUL = "RPC_SUCCESSFUL"; + public static final String RPC_TIMEOUT = "RPC_TIMEOUT"; + public static final String RPC_FAILED = "RPC_FAILED"; + public static final String DEFAULT_SECRET_KEY = ""; public static final String SECRET_KEY_FIELD_NAME = "secretKey"; public static final String DURATION_MS_FIELD_NAME = "durationMs"; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/EntityType.java b/common/data/src/main/java/org/thingsboard/server/common/data/EntityType.java index cf6c6fd9a7..224af1fc3b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/EntityType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/EntityType.java @@ -19,5 +19,5 @@ package org.thingsboard.server.common.data; * @author Andrew Shvayka */ public enum EntityType { - TENANT, CUSTOMER, USER, DASHBOARD, ASSET, DEVICE, ALARM, RULE_CHAIN, RULE_NODE, ENTITY_VIEW, WIDGETS_BUNDLE, WIDGET_TYPE, TENANT_PROFILE, DEVICE_PROFILE, API_USAGE_STATE, TB_RESOURCE, OTA_PACKAGE, EDGE; + TENANT, CUSTOMER, USER, DASHBOARD, ASSET, DEVICE, ALARM, RULE_CHAIN, RULE_NODE, ENTITY_VIEW, WIDGETS_BUNDLE, WIDGET_TYPE, TENANT_PROFILE, DEVICE_PROFILE, API_USAGE_STATE, TB_RESOURCE, OTA_PACKAGE, EDGE, RPC; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java index a4b2327c75..6d8ddfdd18 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java @@ -75,6 +75,8 @@ public class EntityIdFactory { return new OtaPackageId(uuid); case EDGE: return new EdgeId(uuid); + case RPC: + return new RpcId(uuid); } throw new IllegalArgumentException("EntityType " + type + " is not supported!"); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/id/RpcId.java b/common/data/src/main/java/org/thingsboard/server/common/data/id/RpcId.java new file mode 100644 index 0000000000..a4cd8797b0 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/id/RpcId.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.id; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.thingsboard.server.common.data.EntityType; + +import java.util.UUID; + +public final class RpcId extends UUIDBased implements EntityId { + + private static final long serialVersionUID = 1L; + + @JsonCreator + public RpcId(@JsonProperty("id") UUID id) { + super(id); + } + + @JsonIgnore + @Override + public EntityType getEntityType() { + return EntityType.RPC; + } +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rpc/Rpc.java b/common/data/src/main/java/org/thingsboard/server/common/data/rpc/Rpc.java new file mode 100644 index 0000000000..504c89b569 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rpc/Rpc.java @@ -0,0 +1,54 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.rpc; + +import com.fasterxml.jackson.databind.JsonNode; +import lombok.Data; +import lombok.EqualsAndHashCode; +import org.thingsboard.server.common.data.BaseData; +import org.thingsboard.server.common.data.HasTenantId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.RpcId; +import org.thingsboard.server.common.data.id.TenantId; + +@Data +@EqualsAndHashCode(callSuper = true) +public class Rpc extends BaseData implements HasTenantId { + private TenantId tenantId; + private DeviceId deviceId; + private long expirationTime; + private JsonNode request; + private JsonNode response; + private RpcStatus status; + + public Rpc() { + super(); + } + + public Rpc(RpcId id) { + super(id); + } + + public Rpc(Rpc rpc) { + super(rpc); + this.tenantId = rpc.getTenantId(); + this.deviceId = rpc.getDeviceId(); + this.expirationTime = rpc.getExpirationTime(); + this.request = rpc.getRequest(); + this.response = rpc.getResponse(); + this.status = rpc.getStatus(); + } +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java b/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java new file mode 100644 index 0000000000..c80d0c5993 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcStatus.java @@ -0,0 +1,20 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.rpc; + +public enum RpcStatus { + QUEUED, DELIVERED, SUCCESSFUL, TIMEOUT, FAILED +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java index 8cdccfe8bd..ce10f95055 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java @@ -56,6 +56,7 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura private int defaultStorageTtlDays; private int alarmsTtlDays; + private int rpcTtlDays; private double warnThreshold; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/ToDeviceRpcRequest.java b/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/ToDeviceRpcRequest.java index 3cce7b1d33..b3b33146d0 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/ToDeviceRpcRequest.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/ToDeviceRpcRequest.java @@ -34,5 +34,6 @@ public class ToDeviceRpcRequest implements Serializable { private final boolean oneway; private final long expirationTime; private final ToDeviceRpcRequestBody body; + private final boolean persisted; } diff --git a/common/queue/src/main/proto/queue.proto b/common/queue/src/main/proto/queue.proto index acea8d6382..049b6934e7 100644 --- a/common/queue/src/main/proto/queue.proto +++ b/common/queue/src/main/proto/queue.proto @@ -318,6 +318,9 @@ message SubscribeToRPCMsg { SessionType sessionType = 2; } +message SendPendingRPCMsg { +} + message ToDeviceRpcRequestMsg { int32 requestId = 1; string methodName = 2; @@ -325,6 +328,8 @@ message ToDeviceRpcRequestMsg { int64 expirationTime = 4; int64 requestIdMSB = 5; int64 requestIdLSB = 6; + bool oneway = 7; + bool persisted = 8; } message ToDeviceRpcResponseMsg { @@ -332,6 +337,13 @@ message ToDeviceRpcResponseMsg { string payload = 2; } +message ToDevicePersistedRpcResponseMsg { + int32 requestId = 1; + int64 requestIdMSB = 2; + int64 requestIdLSB = 3; + string status = 4; +} + message ToServerRpcRequestMsg { int32 requestId = 1; string methodName = 2; @@ -435,6 +447,8 @@ message TransportToDeviceActorMsg { SubscriptionInfoProto subscriptionInfo = 7; ClaimDeviceMsg claimDevice = 8; ProvisionDeviceRequestMsg provisionDevice = 9; + ToDevicePersistedRpcResponseMsg persistedRpcResponseMsg = 10; + SendPendingRPCMsg sendPendingRPC = 11; } message TransportToRuleEngineMsg { diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 95cdeada88..85b0f5e03d 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportC import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadConfiguration; import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration; import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration; +import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.data.security.DeviceTokenCredentials; import org.thingsboard.server.common.msg.session.FeatureType; import org.thingsboard.server.common.msg.session.SessionMsgType; @@ -332,14 +333,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource { break; case TO_SERVER_RPC_REQUEST: transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, - transportConfigurationContainer.getRpcRequestDynamicMessageBuilder()), timeout); + transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), sessionInfo), timeout); transportService.process(sessionInfo, coapTransportAdaptor.convertToServerRpcRequest(sessionId, request), new CoapNoOpCallback(exchange)); break; case GET_ATTRIBUTES_REQUEST: transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, - transportConfigurationContainer.getRpcRequestDynamicMessageBuilder()), timeout); + transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), sessionInfo), timeout); transportService.process(sessionInfo, coapTransportAdaptor.convertToGetAttributes(sessionId, request), new CoapNoOpCallback(exchange)); @@ -362,12 +363,12 @@ public class CoapTransportResource extends AbstractCoapTransportResource { private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) { tokenToSessionInfoMap.putIfAbsent(token, sessionInfo); - transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder)); + transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo)); transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); } - private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) { - return new CoapSessionListener(this, exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder); + private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) { + return new CoapSessionListener(this, exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo); } private String getTokenFromRequest(Request request) { @@ -455,12 +456,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource { private final CoapExchange exchange; private final CoapTransportAdaptor coapTransportAdaptor; private final DynamicMessage.Builder rpcRequestDynamicMessageBuilder; + private final TransportProtos.SessionInfoProto sessionInfo; - CoapSessionListener(CoapTransportResource coapTransportResource, CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) { + CoapSessionListener(CoapTransportResource coapTransportResource, CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) { this.coapTransportResource = coapTransportResource; this.exchange = exchange; this.coapTransportAdaptor = coapTransportAdaptor; this.rpcRequestDynamicMessageBuilder = rpcRequestDynamicMessageBuilder; + this.sessionInfo = sessionInfo; } @Override @@ -503,11 +506,31 @@ public class CoapTransportResource extends AbstractCoapTransportResource { @Override public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg msg) { + boolean successful; try { exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder)); + successful = true; } catch (AdaptorException e) { log.trace("Failed to reply due to error", e); exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); + successful = false; + } + if (msg.getPersisted()) { + RpcStatus status; + if (!successful) { + status = RpcStatus.FAILED; + } else if (msg.getOneway()) { + status = RpcStatus.SUCCESSFUL; + } else { + status = RpcStatus.DELIVERED; + } + TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder() + .setRequestId(msg.getRequestId()) + .setRequestIdLSB(msg.getRequestIdLSB()) + .setRequestIdMSB(msg.getRequestIdMSB()) + .setStatus(status.name()) + .build(); + coapTransportResource.transportService.process(sessionInfo, responseMsg, TransportServiceCallback.EMPTY); } } diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index f4a3f705af..cfb25e9d78 100644 --- a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -17,6 +17,7 @@ package org.thingsboard.server.transport.http; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; @@ -34,9 +35,10 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; import org.thingsboard.server.common.data.DeviceTransportType; -import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.data.TbTransportService; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.ota.OtaPackageType; +import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportContext; import org.thingsboard.server.common.transport.TransportService; @@ -95,7 +97,9 @@ public class DeviceApiController implements TbTransportService { request.addAllSharedAttributeNames(sharedKeySet); } TransportService transportService = transportContext.getTransportService(); - transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout()); + transportService.registerSyncSession(sessionInfo, + new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo), + transportContext.getDefaultTimeout()); transportService.process(sessionInfo, request.build(), new SessionCloseOnErrorCallback(transportService, sessionInfo)); })); return responseWriter; @@ -151,7 +155,8 @@ public class DeviceApiController implements TbTransportService { transportContext.getTransportService().process(DeviceTransportType.DEFAULT, ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(), new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> { TransportService transportService = transportContext.getTransportService(); - transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), + transportService.registerSyncSession(sessionInfo, + new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo), timeout == 0 ? transportContext.getDefaultTimeout() : timeout); transportService.process(sessionInfo, SubscribeToRPCMsg.getDefaultInstance(), new SessionCloseOnErrorCallback(transportService, sessionInfo)); @@ -181,7 +186,9 @@ public class DeviceApiController implements TbTransportService { new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> { JsonObject request = new JsonParser().parse(json).getAsJsonObject(); TransportService transportService = transportContext.getTransportService(); - transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout()); + transportService.registerSyncSession(sessionInfo, + new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo), + transportContext.getDefaultTimeout()); transportService.process(sessionInfo, ToServerRpcRequestMsg.newBuilder().setRequestId(0) .setMethodName(request.get("method").getAsString()) .setParams(request.get("params").toString()).build(), @@ -198,7 +205,8 @@ public class DeviceApiController implements TbTransportService { transportContext.getTransportService().process(DeviceTransportType.DEFAULT, ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(), new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> { TransportService transportService = transportContext.getTransportService(); - transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), + transportService.registerSyncSession(sessionInfo, + new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo), timeout == 0 ? transportContext.getDefaultTimeout() : timeout); transportService.process(sessionInfo, SubscribeToAttributeUpdatesMsg.getDefaultInstance(), new SessionCloseOnErrorCallback(transportService, sessionInfo)); @@ -372,13 +380,12 @@ public class DeviceApiController implements TbTransportService { } } + @RequiredArgsConstructor private static class HttpSessionListener implements SessionMsgListener { private final DeferredResult responseWriter; - - HttpSessionListener(DeferredResult responseWriter) { - this.responseWriter = responseWriter; - } + private final TransportService transportService; + private final SessionInfoProto sessionInfo; @Override public void onGetAttributesResponse(GetAttributeResponseMsg msg) { @@ -399,6 +406,21 @@ public class DeviceApiController implements TbTransportService { @Override public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg msg) { responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK)); + if (msg.getPersisted()) { + RpcStatus status; + if (msg.getOneway()) { + status = RpcStatus.SUCCESSFUL; + } else { + status = RpcStatus.DELIVERED; + } + TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder() + .setRequestId(msg.getRequestId()) + .setRequestIdLSB(msg.getRequestIdLSB()) + .setRequestIdMSB(msg.getRequestIdMSB()) + .setStatus(status.name()) + .build(); + transportService.process(sessionInfo, responseMsg, TransportServiceCallback.EMPTY); + } } @Override diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/bootstrap/secure/LwM2MBootstrapSecurityStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/bootstrap/secure/LwM2MBootstrapSecurityStore.java index 7740778adf..c7ec42d879 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/bootstrap/secure/LwM2MBootstrapSecurityStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/bootstrap/secure/LwM2MBootstrapSecurityStore.java @@ -155,7 +155,7 @@ public class LwM2MBootstrapSecurityStore implements BootstrapSecurityStore { LwM2MServerBootstrap profileLwm2mServer = JacksonUtil.fromString(JacksonUtil.toString(bootstrapObject.getLwm2mServer()), LwM2MServerBootstrap.class); UUID sessionUUiD = UUID.randomUUID(); TransportProtos.SessionInfoProto sessionInfo = helper.getValidateSessionInfo(store.getMsg(), sessionUUiD.getMostSignificantBits(), sessionUUiD.getLeastSignificantBits()); - context.getTransportService().registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(null, null, null, sessionInfo)); + context.getTransportService().registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(null, null, null, sessionInfo, context.getTransportService())); if (this.getValidatedSecurityMode(lwM2MBootstrapConfig.bootstrapServer, profileServerBootstrap, lwM2MBootstrapConfig.lwm2mServer, profileLwm2mServer)) { lwM2MBootstrapConfig.bootstrapServer = new LwM2MServerBootstrap(lwM2MBootstrapConfig.bootstrapServer, profileServerBootstrap); lwM2MBootstrapConfig.lwm2mServer = new LwM2MServerBootstrap(lwM2MBootstrapConfig.lwm2mServer, profileLwm2mServer); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/secure/TbLwM2MDtlsCertificateVerifier.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/secure/TbLwM2MDtlsCertificateVerifier.java index 792ba131e8..04b69c815f 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/secure/TbLwM2MDtlsCertificateVerifier.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/secure/TbLwM2MDtlsCertificateVerifier.java @@ -42,8 +42,8 @@ import org.thingsboard.server.common.transport.util.SslUtil; import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig; import org.thingsboard.server.transport.lwm2m.secure.credentials.LwM2MCredentials; -import org.thingsboard.server.transport.lwm2m.server.store.TbEditableSecurityStore; import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MDtlsSessionStore; +import org.thingsboard.server.transport.lwm2m.server.store.TbMainSecurityStore; import javax.annotation.PostConstruct; import javax.security.auth.x500.X500Principal; @@ -67,7 +67,7 @@ public class TbLwM2MDtlsCertificateVerifier implements NewAdvancedCertificateVer private final TbLwM2MDtlsSessionStore sessionStorage; private final LwM2MTransportServerConfig config; private final LwM2mCredentialsSecurityInfoValidator securityInfoValidator; - private final TbEditableSecurityStore securityStore; + private final TbMainSecurityStore securityStore; @SuppressWarnings("deprecation") private StaticCertificateVerifier staticCertificateVerifier; @@ -134,7 +134,7 @@ public class TbLwM2MDtlsCertificateVerifier implements NewAdvancedCertificateVer if (msg.hasDeviceInfo() && deviceProfile != null) { sessionStorage.put(endpoint, new TbX509DtlsSessionInfo(cert.getSubjectX500Principal().getName(), msg)); try { - securityStore.put(securityInfo); + securityStore.putX509(securityInfo); } catch (NonUniqueSecurityInfoException e) { log.trace("Failed to add security info: {}", securityInfo, e); } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java index f39ced6dfc..bcc23dd778 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mSessionMsgListener.java @@ -23,7 +23,10 @@ import org.jetbrains.annotations.NotNull; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.ResourceType; +import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.transport.SessionMsgListener; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; @@ -45,6 +48,7 @@ public class LwM2mSessionMsgListener implements GenericFutureListener lwM2mClientsByEndpoint = new ConcurrentHashMap<>(); private final Map lwM2mClientsByRegistrationId = new ConcurrentHashMap<>(); private final Map profiles = new ConcurrentHashMap<>(); @@ -75,6 +76,9 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { oldSession = lwM2MClient.getSession(); TbLwM2MSecurityInfo securityInfo = securityStore.getTbLwM2MSecurityInfoByEndpoint(lwM2MClient.getEndpoint()); if (securityInfo.getSecurityMode() != null) { + if (SecurityMode.X509.equals(securityInfo.getSecurityMode())) { + securityStore.registerX509(registration.getEndpoint(), registration.getId()); + } if (securityInfo.getDeviceProfile() != null) { profileUpdate(securityInfo.getDeviceProfile()); if (securityInfo.getSecurityInfo() != null) { @@ -124,7 +128,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { if (currentRegistration.getId().equals(registration.getId())) { lwM2MClient.setState(LwM2MClientState.UNREGISTERED); lwM2mClientsByEndpoint.remove(lwM2MClient.getEndpoint()); - this.securityStore.remove(lwM2MClient.getEndpoint()); + this.securityStore.remove(lwM2MClient.getEndpoint(), registration.getId()); UUID profileId = lwM2MClient.getProfileId(); if (profileId != null) { Optional otherClients = lwM2mClientsByRegistrationId.values().stream().filter(e -> e.getProfileId().equals(profileId)).findFirst(); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mSecurityStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mSecurityStore.java index d47be49978..bf1f275f32 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mSecurityStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mSecurityStore.java @@ -22,13 +22,22 @@ import org.jetbrains.annotations.Nullable; import org.thingsboard.server.transport.lwm2m.secure.LwM2mCredentialsSecurityInfoValidator; import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import static org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mTypeServer.CLIENT; @Slf4j -public class TbLwM2mSecurityStore implements TbEditableSecurityStore { +public class TbLwM2mSecurityStore implements TbMainSecurityStore { private final TbEditableSecurityStore securityStore; private final LwM2mCredentialsSecurityInfoValidator validator; + private final ConcurrentMap> endpointRegistrations = new ConcurrentHashMap<>(); public TbLwM2mSecurityStore(TbEditableSecurityStore securityStore, LwM2mCredentialsSecurityInfoValidator validator) { this.securityStore = securityStore; @@ -61,24 +70,42 @@ public class TbLwM2mSecurityStore implements TbEditableSecurityStore { @Nullable public SecurityInfo fetchAndPutSecurityInfo(String credentialsId) { TbLwM2MSecurityInfo securityInfo = validator.getEndpointSecurityInfoByCredentialsId(credentialsId, CLIENT); - try { - if (securityInfo != null) { - securityStore.put(securityInfo); - } - } catch (NonUniqueSecurityInfoException e) { - log.trace("Failed to add security info: {}", securityInfo, e); - } + doPut(securityInfo); return securityInfo != null ? securityInfo.getSecurityInfo() : null; } - @Override - public void put(TbLwM2MSecurityInfo tbSecurityInfo) throws NonUniqueSecurityInfoException { - securityStore.put(tbSecurityInfo); + private void doPut(TbLwM2MSecurityInfo securityInfo) { + if (securityInfo != null) { + try { + securityStore.put(securityInfo); + } catch (NonUniqueSecurityInfoException e) { + log.trace("Failed to add security info: {}", securityInfo, e); + } + } } @Override - public void remove(String endpoint) { - //TODO: Make sure we delay removal of security store from endpoint due to reg/unreg race condition. -// securityStore.remove(endpoint); + public void putX509(TbLwM2MSecurityInfo securityInfo) throws NonUniqueSecurityInfoException { + securityStore.put(securityInfo); + } + + @Override + public void registerX509(String endpoint, String registrationId) { + endpointRegistrations.computeIfAbsent(endpoint, ep -> new HashSet<>()).add(registrationId); + } + + @Override + public void remove(String endpoint, String registrationId) { + Set epRegistrationIds = endpointRegistrations.get(endpoint); + boolean shouldRemove; + if (epRegistrationIds == null) { + shouldRemove = true; + } else { + epRegistrationIds.remove(registrationId); + shouldRemove = epRegistrationIds.isEmpty(); + } + if (shouldRemove) { + securityStore.remove(endpoint); + } } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mStoreFactory.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mStoreFactory.java index 154de636de..b9eb865df5 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mStoreFactory.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mStoreFactory.java @@ -51,7 +51,7 @@ public class TbLwM2mStoreFactory { } @Bean - private TbEditableSecurityStore securityStore() { + private TbMainSecurityStore securityStore() { return new TbLwM2mSecurityStore(redisConfiguration.isPresent() && useRedis ? new TbLwM2mRedisSecurityStore(redisConfiguration.get().redisConnectionFactory()) : new TbInMemorySecurityStore(), validator); } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbMainSecurityStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbMainSecurityStore.java new file mode 100644 index 0000000000..f4394fb337 --- /dev/null +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbMainSecurityStore.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.store; + +import org.eclipse.leshan.server.security.NonUniqueSecurityInfoException; +import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo; + +public interface TbMainSecurityStore extends TbSecurityStore { + + void putX509(TbLwM2MSecurityInfo tbSecurityInfo) throws NonUniqueSecurityInfoException; + + void registerX509(String endpoint, String registrationId); + + void remove(String endpoint, String registrationId); + +} diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java index 767c316cc6..1d14de3ba0 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java @@ -212,7 +212,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl } logService.log(lwM2MClient, LOG_LWM2M_INFO + ": Client registered with registration id: " + registration.getId()); SessionInfoProto sessionInfo = lwM2MClient.getSession(); - transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo)); + transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo, transportService)); log.warn("40) sessionId [{}] Registering rpc subscription after Registration client", new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB())); TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder() .setSessionInfo(sessionInfo) @@ -888,7 +888,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl */ private void reportActivityAndRegister(SessionInfoProto sessionInfo) { if (sessionInfo != null && transportService.reportActivity(sessionInfo) == null) { - transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo)); + transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo, transportService)); this.reportActivitySubscription(sessionInfo); } } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 961c20ed34..c2faa3d808 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt; import com.fasterxml.jackson.databind.JsonNode; import com.google.gson.JsonParseException; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.mqtt.MqttConnAckMessage; @@ -47,8 +48,9 @@ import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.device.profile.MqttTopics; -import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.data.id.OtaPackageId; +import org.thingsboard.server.common.data.ota.OtaPackageType; +import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.msg.EncryptionUtil; import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.common.transport.SessionMsgListener; @@ -813,7 +815,31 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) { log.trace("[{}] Received RPC command to device", sessionId); try { - deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); + deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest) + .ifPresent(payload -> { + ChannelFuture channelFuture = deviceSessionCtx.getChannel().writeAndFlush(payload); + if (rpcRequest.getPersisted()) { + channelFuture.addListener(future -> { + RpcStatus status; + Throwable t = future.cause(); + if (t != null) { + log.error("Failed delivering RPC command to device!", t); + status = RpcStatus.FAILED; + } else if (rpcRequest.getOneway()) { + status = RpcStatus.SUCCESSFUL; + } else { + status = RpcStatus.DELIVERED; + } + TransportProtos.ToDevicePersistedRpcResponseMsg msg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder() + .setRequestId(rpcRequest.getRequestId()) + .setRequestIdLSB(rpcRequest.getRequestIdLSB()) + .setRequestIdMSB(rpcRequest.getRequestIdMSB()) + .setStatus(status.name()) + .build(); + transportService.process(deviceSessionCtx.getSessionInfo(), msg, TransportServiceCallback.EMPTY); + }); + } + }); } catch (Exception e) { log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index 707586b7de..9e3633cb4b 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -15,9 +15,13 @@ */ package org.thingsboard.server.transport.mqtt.session; +import io.netty.channel.ChannelFuture; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.transport.SessionMsgListener; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; @@ -32,9 +36,11 @@ import java.util.concurrent.ConcurrentMap; public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener { private final GatewaySessionHandler parent; + private final TransportService transportService; public GatewayDeviceSessionCtx(GatewaySessionHandler parent, TransportDeviceInfo deviceInfo, - DeviceProfile deviceProfile, ConcurrentMap mqttQoSMap) { + DeviceProfile deviceProfile, ConcurrentMap mqttQoSMap, + TransportService transportService) { super(UUID.randomUUID(), mqttQoSMap); this.parent = parent; setSessionInfo(SessionInfoProto.newBuilder() @@ -56,6 +62,7 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple .build()); setDeviceInfo(deviceInfo); setDeviceProfile(deviceProfile); + this.transportService = transportService; } @Override @@ -89,7 +96,32 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple @Override public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg request) { try { - parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(parent::writeAndFlush); + parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent( + payload -> { + ChannelFuture channelFuture = parent.writeAndFlush(payload); + if (request.getPersisted()) { + channelFuture.addListener(future -> { + RpcStatus status; + Throwable t = future.cause(); + if (t != null) { + log.error("Failed delivering RPC command to device!", t); + status = RpcStatus.FAILED; + } else if (request.getOneway()) { + status = RpcStatus.SUCCESSFUL; + } else { + status = RpcStatus.DELIVERED; + } + TransportProtos.ToDevicePersistedRpcResponseMsg msg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder() + .setRequestId(request.getRequestId()) + .setRequestIdLSB(request.getRequestIdLSB()) + .setRequestIdMSB(request.getRequestIdMSB()) + .setStatus(status.name()) + .build(); + transportService.process(getSessionInfo(), msg, TransportServiceCallback.EMPTY); + }); + } + } + ); } catch (Exception e) { log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index 891c466151..193a7d24f5 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -28,6 +28,7 @@ import com.google.gson.JsonSyntaxException; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.ProtocolStringList; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; @@ -188,8 +189,8 @@ public class GatewaySessionHandler { } } - void writeAndFlush(MqttMessage mqttMessage) { - channel.writeAndFlush(mqttMessage); + ChannelFuture writeAndFlush(MqttMessage mqttMessage) { + return channel.writeAndFlush(mqttMessage); } int nextMsgId() { @@ -251,7 +252,7 @@ public class GatewaySessionHandler { new TransportServiceCallback() { @Override public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) { - GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap); + GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java index ce70ece5ff..da459214c0 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java @@ -26,7 +26,9 @@ import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration; import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.transport.SessionMsgListener; +import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; @@ -139,6 +141,21 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S @Override public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) { snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest); + if (toDeviceRequest.getPersisted()) { + RpcStatus status; + if (toDeviceRequest.getOneway()) { + status = RpcStatus.SUCCESSFUL; + } else { + status = RpcStatus.DELIVERED; + } + TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder() + .setRequestId(toDeviceRequest.getRequestId()) + .setRequestIdLSB(toDeviceRequest.getRequestIdLSB()) + .setRequestIdMSB(toDeviceRequest.getRequestIdMSB()) + .setStatus(status.name()) + .build(); + snmpTransportContext.getTransportService().process(getSessionInfo(), responseMsg, TransportServiceCallback.EMPTY); + } } @Override diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index 4a4e68f64f..e692d3b002 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.common.transport.service.SessionMetaData; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; @@ -109,6 +110,8 @@ public interface TransportService { void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback callback); + void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDevicePersistedRpcResponseMsg msg, TransportServiceCallback callback); + void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback callback); void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback callback); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 77b75dc07a..b4e536a49b 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -557,6 +557,15 @@ public class DefaultTransportService implements TransportService { } } + @Override + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDevicePersistedRpcResponseMsg msg, TransportServiceCallback callback) { + if (checkLimits(sessionInfo, msg, callback)) { + reportActivityInternal(sessionInfo); + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPersistedRpcResponseMsg(msg).build(), + new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback)); + } + } + private void processTimeout(String requestId) { RpcRequestMetadata data = toServerRpcPendingMap.remove(requestId); if (data != null) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java index 87cee53991..75febf4dad 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java @@ -506,6 +506,17 @@ public class ModelConstants { public static final String OTA_PACKAGE_DATA_SIZE_COLUMN = "data_size"; public static final String OTA_PACKAGE_ADDITIONAL_INFO_COLUMN = ADDITIONAL_INFO_PROPERTY; + /** + * Persisted RPC constants. + */ + public static final String RPC_TABLE_NAME = "rpc"; + public static final String RPC_TENANT_ID_COLUMN = TENANT_ID_COLUMN; + public static final String RPC_DEVICE_ID = "device_id"; + public static final String RPC_EXPIRATION_TIME = "expiration_time"; + public static final String RPC_REQUEST = "request"; + public static final String RPC_RESPONSE = "response"; + public static final String RPC_STATUS = "status"; + /** * Edge constants. */ diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/RpcEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RpcEntity.java new file mode 100644 index 0000000000..a8823cb8cd --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RpcEntity.java @@ -0,0 +1,102 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.model.sql; + +import com.fasterxml.jackson.databind.JsonNode; +import lombok.Data; +import lombok.EqualsAndHashCode; +import org.hibernate.annotations.Type; +import org.hibernate.annotations.TypeDef; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.RpcId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.rpc.Rpc; +import org.thingsboard.server.common.data.rpc.RpcStatus; +import org.thingsboard.server.dao.model.BaseEntity; +import org.thingsboard.server.dao.model.BaseSqlEntity; +import org.thingsboard.server.dao.util.mapping.JsonStringType; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.EnumType; +import javax.persistence.Enumerated; +import javax.persistence.Table; +import java.util.UUID; + +import static org.thingsboard.server.dao.model.ModelConstants.RPC_DEVICE_ID; +import static org.thingsboard.server.dao.model.ModelConstants.RPC_EXPIRATION_TIME; +import static org.thingsboard.server.dao.model.ModelConstants.RPC_REQUEST; +import static org.thingsboard.server.dao.model.ModelConstants.RPC_RESPONSE; +import static org.thingsboard.server.dao.model.ModelConstants.RPC_STATUS; +import static org.thingsboard.server.dao.model.ModelConstants.RPC_TABLE_NAME; +import static org.thingsboard.server.dao.model.ModelConstants.RPC_TENANT_ID_COLUMN; + +@Data +@EqualsAndHashCode(callSuper = true) +@Entity +@TypeDef(name = "json", typeClass = JsonStringType.class) +@Table(name = RPC_TABLE_NAME) +public class RpcEntity extends BaseSqlEntity implements BaseEntity { + + @Column(name = RPC_TENANT_ID_COLUMN) + private UUID tenantId; + + @Column(name = RPC_DEVICE_ID) + private UUID deviceId; + + @Column(name = RPC_EXPIRATION_TIME) + private long expirationTime; + + @Type(type = "json") + @Column(name = RPC_REQUEST) + private JsonNode request; + + @Type(type = "json") + @Column(name = RPC_RESPONSE) + private JsonNode response; + + @Enumerated(EnumType.STRING) + @Column(name = RPC_STATUS) + private RpcStatus status; + + public RpcEntity() { + super(); + } + + public RpcEntity(Rpc rpc) { + this.setUuid(rpc.getUuidId()); + this.createdTime = rpc.getCreatedTime(); + this.tenantId = rpc.getTenantId().getId(); + this.deviceId = rpc.getDeviceId().getId(); + this.expirationTime = rpc.getExpirationTime(); + this.request = rpc.getRequest(); + this.response = rpc.getResponse(); + this.status = rpc.getStatus(); + } + + @Override + public Rpc toData() { + Rpc rpc = new Rpc(new RpcId(id)); + rpc.setCreatedTime(createdTime); + rpc.setTenantId(new TenantId(tenantId)); + rpc.setDeviceId(new DeviceId(deviceId)); + rpc.setExpirationTime(expirationTime); + rpc.setRequest(request); + rpc.setResponse(response); + rpc.setStatus(status); + return rpc; + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/rpc/BaseRpcService.java b/dao/src/main/java/org/thingsboard/server/dao/rpc/BaseRpcService.java new file mode 100644 index 0000000000..02b4bbe433 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/rpc/BaseRpcService.java @@ -0,0 +1,100 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.rpc; + +import com.google.common.util.concurrent.ListenableFuture; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.RpcId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.rpc.Rpc; +import org.thingsboard.server.common.data.rpc.RpcStatus; +import org.thingsboard.server.dao.service.PaginatedRemover; + +import static org.thingsboard.server.dao.service.Validator.validateId; +import static org.thingsboard.server.dao.service.Validator.validatePageLink; + +@Service +@Slf4j +@RequiredArgsConstructor +public class BaseRpcService implements RpcService { + public static final String INCORRECT_TENANT_ID = "Incorrect tenantId "; + public static final String INCORRECT_RPC_ID = "Incorrect rpcId "; + + private final RpcDao rpcDao; + + @Override + public Rpc save(Rpc rpc) { + log.trace("Executing save, [{}]", rpc); + return rpcDao.save(rpc.getTenantId(), rpc); + } + + @Override + public void deleteRpc(TenantId tenantId, RpcId rpcId) { + log.trace("Executing deleteRpc, tenantId [{}], rpcId [{}]", tenantId, rpcId); + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); + validateId(rpcId, INCORRECT_RPC_ID + rpcId); + rpcDao.removeById(tenantId, rpcId.getId()); + } + + @Override + public void deleteAllRpcByTenantId(TenantId tenantId) { + log.trace("Executing deleteAllRpcByTenantId, tenantId [{}]", tenantId); + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); + tenantRpcRemover.removeEntities(tenantId, tenantId); + } + + @Override + public Rpc findById(TenantId tenantId, RpcId rpcId) { + log.trace("Executing findById, tenantId [{}], rpcId [{}]", tenantId, rpcId); + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); + validateId(rpcId, INCORRECT_RPC_ID + rpcId); + return rpcDao.findById(tenantId, rpcId.getId()); + } + + @Override + public ListenableFuture findRpcByIdAsync(TenantId tenantId, RpcId rpcId) { + log.trace("Executing findRpcByIdAsync, tenantId [{}], rpcId: [{}]", tenantId, rpcId); + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); + validateId(rpcId, INCORRECT_RPC_ID + rpcId); + return rpcDao.findByIdAsync(tenantId, rpcId.getId()); + } + + @Override + public PageData findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) { + log.trace("Executing findAllByDeviceIdAndStatus, tenantId [{}], deviceId [{}], rpcStatus [{}], pageLink [{}]", tenantId, deviceId, rpcStatus, pageLink); + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); + validatePageLink(pageLink); + return rpcDao.findAllByDeviceId(tenantId, deviceId, rpcStatus, pageLink); + } + + private PaginatedRemover tenantRpcRemover = + new PaginatedRemover<>() { + @Override + protected PageData findEntities(TenantId tenantId, TenantId id, PageLink pageLink) { + return rpcDao.findAllRpcByTenantId(id, pageLink); + } + + @Override + protected void removeEntity(TenantId tenantId, Rpc entity) { + deleteRpc(tenantId, entity.getId()); + } + }; +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java b/dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java new file mode 100644 index 0000000000..63af784dbb --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.rpc; + +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.rpc.Rpc; +import org.thingsboard.server.common.data.rpc.RpcStatus; +import org.thingsboard.server.dao.Dao; + +public interface RpcDao extends Dao { + PageData findAllByDeviceId(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink); + + PageData findAllRpcByTenantId(TenantId tenantId, PageLink pageLink); + + Long deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime); +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDao.java new file mode 100644 index 0000000000..221ef17361 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/JpaRpcDao.java @@ -0,0 +1,66 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sql.rpc; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.repository.CrudRepository; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.rpc.Rpc; +import org.thingsboard.server.common.data.rpc.RpcStatus; +import org.thingsboard.server.dao.DaoUtil; +import org.thingsboard.server.dao.model.sql.RpcEntity; +import org.thingsboard.server.dao.rpc.RpcDao; +import org.thingsboard.server.dao.sql.JpaAbstractDao; + +import java.util.UUID; + +@Slf4j +@Component +@AllArgsConstructor +public class JpaRpcDao extends JpaAbstractDao implements RpcDao { + + private final RpcRepository rpcRepository; + + @Override + protected Class getEntityClass() { + return RpcEntity.class; + } + + @Override + protected CrudRepository getCrudRepository() { + return rpcRepository; + } + + @Override + public PageData findAllByDeviceId(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) { + return DaoUtil.toPageData(rpcRepository.findAllByTenantIdAndDeviceIdAndStatus(tenantId.getId(), deviceId.getId(), rpcStatus, DaoUtil.toPageable(pageLink))); + } + + @Override + public PageData findAllRpcByTenantId(TenantId tenantId, PageLink pageLink) { + return DaoUtil.toPageData(rpcRepository.findAllByTenantId(tenantId.getId(), DaoUtil.toPageable(pageLink))); + } + + @Override + public Long deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime) { + return rpcRepository.deleteOutdatedRpcByTenantId(tenantId.getId(), expirationTime); + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/RpcRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/RpcRepository.java new file mode 100644 index 0000000000..76b67b3823 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/rpc/RpcRepository.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sql.rpc; + +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.CrudRepository; +import org.springframework.data.repository.query.Param; +import org.thingsboard.server.common.data.rpc.RpcStatus; +import org.thingsboard.server.dao.model.sql.RpcEntity; + +import java.util.UUID; + +public interface RpcRepository extends CrudRepository { + Page findAllByTenantIdAndDeviceIdAndStatus(UUID tenantId, UUID deviceId, RpcStatus status, Pageable pageable); + + Page findAllByTenantId(UUID tenantId, Pageable pageable); + + @Query(value = "WITH deleted AS (DELETE FROM rpc WHERE (tenant_id = :tenantId AND created_time < :expirationTime) IS TRUE RETURNING *) SELECT count(*) FROM deleted", + nativeQuery = true) + Long deleteOutdatedRpcByTenantId(@Param("tenantId") UUID tenantId, @Param("expirationTime") Long expirationTime); +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java index c5050467dd..eab0a070f5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java @@ -37,6 +37,7 @@ import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.ota.OtaPackageService; import org.thingsboard.server.dao.resource.ResourceService; +import org.thingsboard.server.dao.rpc.RpcService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.PaginatedRemover; @@ -96,6 +97,9 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe @Autowired private OtaPackageService otaPackageService; + @Autowired + private RpcService rpcService; + @Override public Tenant findTenantById(TenantId tenantId) { log.trace("Executing findTenantById [{}]", tenantId); @@ -151,6 +155,7 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe apiUsageStateService.deleteApiUsageStateByTenantId(tenantId); resourceService.deleteResourcesByTenantId(tenantId); otaPackageService.deleteOtaPackagesByTenantId(tenantId); + rpcService.deleteAllRpcByTenantId(tenantId); tenantDao.removeById(tenantId, tenantId.getId()); deleteEntityRelations(tenantId, tenantId); } diff --git a/dao/src/main/resources/sql/schema-entities-hsql.sql b/dao/src/main/resources/sql/schema-entities-hsql.sql index ca7cd73604..812f65fce5 100644 --- a/dao/src/main/resources/sql/schema-entities-hsql.sql +++ b/dao/src/main/resources/sql/schema-entities-hsql.sql @@ -567,3 +567,14 @@ CREATE TABLE IF NOT EXISTS edge_event ( tenant_id uuid, ts bigint NOT NULL ); + +CREATE TABLE IF NOT EXISTS rpc ( + id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY, + created_time bigint NOT NULL, + tenant_id uuid NOT NULL, + device_id uuid NOT NULL, + expiration_time bigint NOT NULL, + request varchar(10000000) NOT NULL, + response varchar(10000000), + status varchar(255) NOT NULL +); diff --git a/dao/src/main/resources/sql/schema-entities-idx.sql b/dao/src/main/resources/sql/schema-entities-idx.sql index 28bbe4311e..d649e44418 100644 --- a/dao/src/main/resources/sql/schema-entities-idx.sql +++ b/dao/src/main/resources/sql/schema-entities-idx.sql @@ -46,3 +46,4 @@ CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribu CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time); +CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id); diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index 303bf6f0f9..4df52fb7bc 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -605,6 +605,17 @@ CREATE TABLE IF NOT EXISTS edge_event ( ts bigint NOT NULL ); +CREATE TABLE IF NOT EXISTS rpc ( + id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY, + created_time bigint NOT NULL, + tenant_id uuid NOT NULL, + device_id uuid NOT NULL, + expiration_time bigint NOT NULL, + request varchar(10000000) NOT NULL, + response varchar(10000000), + status varchar(255) NOT NULL +); + CREATE OR REPLACE PROCEDURE cleanup_events_by_ttl(IN ttl bigint, IN debug_ttl bigint, INOUT deleted bigint) LANGUAGE plpgsql AS $$ diff --git a/dao/src/test/resources/sql/hsql/drop-all-tables.sql b/dao/src/test/resources/sql/hsql/drop-all-tables.sql index f7b1b4118d..2cd7e26b48 100644 --- a/dao/src/test/resources/sql/hsql/drop-all-tables.sql +++ b/dao/src/test/resources/sql/hsql/drop-all-tables.sql @@ -36,4 +36,5 @@ DROP TABLE IF EXISTS resource; DROP TABLE IF EXISTS ota_package; DROP TABLE IF EXISTS edge; DROP TABLE IF EXISTS edge_event; +DROP TABLE IF EXISTS rpc; DROP FUNCTION IF EXISTS to_uuid; diff --git a/dao/src/test/resources/sql/psql/drop-all-tables.sql b/dao/src/test/resources/sql/psql/drop-all-tables.sql index a29dea43c2..0a71953032 100644 --- a/dao/src/test/resources/sql/psql/drop-all-tables.sql +++ b/dao/src/test/resources/sql/psql/drop-all-tables.sql @@ -37,3 +37,4 @@ DROP TABLE IF EXISTS resource; DROP TABLE IF EXISTS firmware; DROP TABLE IF EXISTS edge; DROP TABLE IF EXISTS edge_event; +DROP TABLE IF EXISTS rpc; diff --git a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java index 2d0914c7d4..0c4b673d7f 100644 --- a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java +++ b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java @@ -2378,7 +2378,7 @@ public class RestClient implements ClientHttpRequestInterceptor, Closeable { public void setUserCredentialsEnabled(UserId userId, boolean userCredentialsEnabled) { restTemplate.postForLocation( - baseURL + "/api/user/{userId}/userCredentialsEnabled?serCredentialsEnabled={serCredentialsEnabled}", + baseURL + "/api/user/{userId}/userCredentialsEnabled?userCredentialsEnabled={userCredentialsEnabled}", null, userId.getId(), userCredentialsEnabled); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java index 856dbf8e60..1d74040b34 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java @@ -35,6 +35,7 @@ public final class RuleEngineDeviceRpcRequest { private final UUID requestUUID; private final String originServiceId; private final boolean oneway; + private final boolean persisted; private final String method; private final String body; private final long expirationTime; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java index 7969836e00..85c2a156b3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java @@ -33,8 +33,8 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; type = ComponentType.FILTER, name = "message type switch", configClazz = EmptyNodeConfiguration.class, - relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "Activity Event", "Inactivity Event", - "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned", + relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Failed", + "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned", "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant", "Timeseries Updated", "Timeseries Deleted"}, nodeDescription = "Route incoming messages by Message Type", @@ -95,6 +95,16 @@ public class TbMsgTypeSwitchNode implements TbNode { relationType = "Timeseries Updated"; } else if (msg.getType().equals(DataConstants.TIMESERIES_DELETED)) { relationType = "Timeseries Deleted"; + } else if (msg.getType().equals(DataConstants.RPC_QUEUED)) { + relationType = "RPC Queued"; + } else if (msg.getType().equals(DataConstants.RPC_DELIVERED)) { + relationType = "RPC Delivered"; + } else if (msg.getType().equals(DataConstants.RPC_SUCCESSFUL)) { + relationType = "RPC Successful"; + } else if (msg.getType().equals(DataConstants.RPC_TIMEOUT)) { + relationType = "RPC Timeout"; + } else if (msg.getType().equals(DataConstants.RPC_FAILED)) { + relationType = "RPC Failed"; } else { relationType = "Other"; } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java index 31bd913f47..1867ae4805 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java @@ -81,6 +81,9 @@ public class TbSendRPCRequestNode implements TbNode { tmp = msg.getMetaData().getValue("oneway"); boolean oneway = !StringUtils.isEmpty(tmp) && Boolean.parseBoolean(tmp); + tmp = msg.getMetaData().getValue("persisted"); + boolean persisted = !StringUtils.isEmpty(tmp) && Boolean.parseBoolean(tmp); + tmp = msg.getMetaData().getValue("requestUUID"); UUID requestUUID = !StringUtils.isEmpty(tmp) ? UUID.fromString(tmp) : Uuids.timeBased(); tmp = msg.getMetaData().getValue("originServiceId"); @@ -108,6 +111,7 @@ public class TbSendRPCRequestNode implements TbNode { .originServiceId(originServiceId) .expirationTime(expirationTime) .restApiCall(restApiCall) + .persisted(persisted) .build(); ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> { diff --git a/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.html b/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.html index 4dd248a10c..66cb503bd3 100644 --- a/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.html +++ b/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.html @@ -196,6 +196,18 @@ {{ 'tenant-profile.alarms-ttl-days-days-range' | translate}} + + tenant-profile.rpc-ttl-days + + + {{ 'tenant-profile.rpc-ttl-days-required' | translate}} + + + {{ 'tenant-profile.rpc-ttl-days-days-range' | translate}} + + tenant-profile.max-rule-node-executions-per-message { this.updateModel(); diff --git a/ui-ngx/src/app/shared/models/rule-node.models.ts b/ui-ngx/src/app/shared/models/rule-node.models.ts index 73ff97b061..489c6eef10 100644 --- a/ui-ngx/src/app/shared/models/rule-node.models.ts +++ b/ui-ngx/src/app/shared/models/rule-node.models.ts @@ -352,7 +352,12 @@ export enum MessageType { ATTRIBUTES_UPDATED = 'ATTRIBUTES_UPDATED', ATTRIBUTES_DELETED = 'ATTRIBUTES_DELETED', TIMESERIES_UPDATED = 'TIMESERIES_UPDATED', - TIMESERIES_DELETED = 'TIMESERIES_DELETED' + TIMESERIES_DELETED = 'TIMESERIES_DELETED', + RPC_QUEUED = 'RPC_QUEUED', + RPC_DELIVERED = 'RPC_DELIVERED', + RPC_SUCCESSFUL = 'RPC_SUCCESSFUL', + RPC_TIMEOUT = 'RPC_TIMEOUT', + RPC_FAILED = 'RPC_FAILED' } export const messageTypeNames = new Map( @@ -373,7 +378,12 @@ export const messageTypeNames = new Map( [MessageType.ATTRIBUTES_UPDATED, 'Attributes Updated'], [MessageType.ATTRIBUTES_DELETED, 'Attributes Deleted'], [MessageType.TIMESERIES_UPDATED, 'Timeseries Updated'], - [MessageType.TIMESERIES_DELETED, 'Timeseries Deleted'] + [MessageType.TIMESERIES_DELETED, 'Timeseries Deleted'], + [MessageType.RPC_QUEUED, 'RPC Queued'], + [MessageType.RPC_DELIVERED, 'RPC Delivered'], + [MessageType.RPC_SUCCESSFUL, 'RPC Successful'], + [MessageType.RPC_TIMEOUT, 'RPC Timeout'], + [MessageType.RPC_FAILED, 'RPC Failed'] ] ); diff --git a/ui-ngx/src/app/shared/models/tenant.model.ts b/ui-ngx/src/app/shared/models/tenant.model.ts index badc48bea4..649468ec08 100644 --- a/ui-ngx/src/app/shared/models/tenant.model.ts +++ b/ui-ngx/src/app/shared/models/tenant.model.ts @@ -53,6 +53,7 @@ export interface DefaultTenantProfileConfiguration { defaultStorageTtlDays: number; alarmsTtlDays: number; + rpcTtlDays: number; } export type TenantProfileConfigurations = DefaultTenantProfileConfiguration; @@ -85,7 +86,8 @@ export function createTenantProfileConfiguration(type: TenantProfileType): Tenan maxSms: 0, maxCreatedAlarms: 0, defaultStorageTtlDays: 0, - alarmsTtlDays: 0 + alarmsTtlDays: 0, + rpcTtlDays: 0 }; configuration = {...defaultConfiguration, type: TenantProfileType.DEFAULT}; break; diff --git a/ui-ngx/src/assets/locale/locale.constant-en_US.json b/ui-ngx/src/assets/locale/locale.constant-en_US.json index 791fd97cc3..4af8c7a87a 100644 --- a/ui-ngx/src/assets/locale/locale.constant-en_US.json +++ b/ui-ngx/src/assets/locale/locale.constant-en_US.json @@ -2638,6 +2638,9 @@ "alarms-ttl-days": "Alarms TTL days (0 - unlimited)", "alarms-ttl-days-required": "Alarms TTL days required", "alarms-ttl-days-days-range": "Alarms TTL days can't be negative", + "rpc-ttl-days": "RPC TTL days (0 - unlimited)", + "rpc-ttl-days-required": "RPC TTL days required", + "rpc-ttl-days-days-range": "RPC TTL days can't be negative", "max-rule-node-executions-per-message": "Maximum number of rule node executions per message (0 - unlimited)", "max-rule-node-executions-per-message-required": "Maximum number of rule node executions per message is required.", "max-rule-node-executions-per-message-range": "Maximum number of rule node executions per message can't be negative",