Merge pull request #4722 from YevhenBondarenko/feature/persisted-rpc

implemented persisted RPC
This commit is contained in:
Andrew Shvayka 2021-06-22 18:28:12 +03:00 committed by GitHub
commit 8aa54b7aa5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
57 changed files with 1204 additions and 89 deletions

View File

@ -197,3 +197,17 @@ $$;
ALTER TABLE api_usage_state
ADD COLUMN IF NOT EXISTS alarm_exec VARCHAR(32);
UPDATE api_usage_state SET alarm_exec = 'ENABLED' WHERE alarm_exec IS NULL;
CREATE TABLE IF NOT EXISTS rpc (
id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY,
created_time bigint NOT NULL,
tenant_id uuid NOT NULL,
device_id uuid NOT NULL,
expiration_time bigint NOT NULL,
request varchar(10000000) NOT NULL,
response varchar(10000000),
status varchar(255) NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id);

View File

@ -65,6 +65,7 @@ import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.resource.ResourceService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.rule.RuleNodeStateService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
@ -80,9 +81,9 @@ import org.thingsboard.server.service.executors.ExternalCallExecutorService;
import org.thingsboard.server.service.executors.SharedEventLoopGroupService;
import org.thingsboard.server.service.mail.MailExecutorService;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.service.queue.TbClusterService;
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
import org.thingsboard.server.service.rpc.TbRpcService;
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
import org.thingsboard.server.service.script.JsExecutorService;
import org.thingsboard.server.service.script.JsInvokeService;
@ -303,23 +304,33 @@ public class ActorSystemContext {
@Lazy
@Autowired(required = false)
@Getter private EdgeService edgeService;
@Getter
private EdgeService edgeService;
@Lazy
@Autowired(required = false)
@Getter private EdgeEventService edgeEventService;
@Getter
private EdgeEventService edgeEventService;
@Lazy
@Autowired(required = false)
@Getter private EdgeRpcService edgeRpcService;
@Getter
private EdgeRpcService edgeRpcService;
@Lazy
@Autowired(required = false)
@Getter private ResourceService resourceService;
@Getter
private ResourceService resourceService;
@Lazy
@Autowired(required = false)
@Getter private OtaPackageService otaPackageService;
@Getter
private OtaPackageService otaPackageService;
@Lazy
@Autowired(required = false)
@Getter
private TbRpcService tbRpcService;
@Value("${actors.session.max_concurrent_sessions_per_device:1}")
@Getter

View File

@ -46,7 +46,7 @@ public class DeviceActor extends ContextAwareActor {
super.init(ctx);
log.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId);
try {
processor.initSessionTimeout(ctx);
processor.init(ctx);
log.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId);
} catch (Exception e) {
log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.deviceId, e);

View File

@ -23,6 +23,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RpcError;
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg;
@ -38,12 +39,17 @@ import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.RpcId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
@ -52,8 +58,8 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
@ -68,10 +74,12 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToDevicePersistedRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
@ -162,20 +170,19 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
void processRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg) {
ToDeviceRpcRequest request = msg.getMsg();
ToDeviceRpcRequestBody body = request.getBody();
ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder()
.setRequestId(rpcSeq++)
.setMethodName(body.getMethod())
.setParams(body.getParams())
.setExpirationTime(request.getExpirationTime())
.setRequestIdMSB(request.getId().getMostSignificantBits())
.setRequestIdLSB(request.getId().getLeastSignificantBits())
.build();
ToDeviceRpcRequestMsg rpcRequest = creteToDeviceRpcRequestMsg(request);
long timeout = request.getExpirationTime() - System.currentTimeMillis();
boolean persisted = request.isPersisted();
if (timeout <= 0) {
log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime());
if (persisted) {
createRpc(request, RpcStatus.TIMEOUT);
}
return;
} else if (persisted) {
createRpc(request, RpcStatus.QUEUED);
}
boolean sent;
@ -192,10 +199,16 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
syncSessionSet.add(key);
}
});
log.trace("46) Rpc syncSessionSet [{}] subscription after sent [{}]",syncSessionSet, rpcSubscriptions);
log.trace("46) Rpc syncSessionSet [{}] subscription after sent [{}]", syncSessionSet, rpcSubscriptions);
syncSessionSet.forEach(rpcSubscriptions::remove);
}
if (persisted && !(sent || request.isOneway())) {
ObjectNode response = JacksonUtil.newObjectNode();
response.put("rpcId", request.getId().toString());
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), JacksonUtil.toString(response), null));
}
if (request.isOneway() && sent) {
log.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null));
@ -209,6 +222,32 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
}
}
private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) {
Rpc rpc = new Rpc(new RpcId(request.getId()));
rpc.setCreatedTime(System.currentTimeMillis());
rpc.setTenantId(tenantId);
rpc.setDeviceId(deviceId);
rpc.setExpirationTime(request.getExpirationTime());
rpc.setRequest(JacksonUtil.valueToTree(request));
rpc.setStatus(status);
systemContext.getTbRpcService().save(tenantId, rpc);
return systemContext.getTbRpcService().save(tenantId, rpc);
}
private ToDeviceRpcRequestMsg creteToDeviceRpcRequestMsg(ToDeviceRpcRequest request) {
ToDeviceRpcRequestBody body = request.getBody();
return ToDeviceRpcRequestMsg.newBuilder()
.setRequestId(rpcSeq++)
.setMethodName(body.getMethod())
.setParams(body.getParams())
.setExpirationTime(request.getExpirationTime())
.setRequestIdMSB(request.getId().getMostSignificantBits())
.setRequestIdLSB(request.getId().getLeastSignificantBits())
.setOneway(request.isOneway())
.setPersisted(request.isPersisted())
.build();
}
void processRpcResponsesFromEdge(TbActorCtx context, FromDeviceRpcResponseActorMsg responseMsg) {
log.debug("[{}] Processing rpc command response from edge session", deviceId);
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
@ -230,6 +269,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
if (requestMd != null) {
log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.TIMEOUT, null);
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
}
@ -271,7 +311,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
.setExpirationTime(request.getExpirationTime())
.setRequestIdMSB(request.getId().getMostSignificantBits())
.setRequestIdLSB(request.getId().getLeastSignificantBits())
.setOneway(request.isOneway())
.setPersisted(request.isPersisted())
.build();
sendToTransport(rpcRequest, sessionId, nodeId);
};
}
@ -279,31 +322,39 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
void process(TbActorCtx context, TransportToDeviceActorMsgWrapper wrapper) {
TransportToDeviceActorMsg msg = wrapper.getMsg();
TbCallback callback = wrapper.getCallback();
var sessionInfo = msg.getSessionInfo();
if (msg.hasSessionEvent()) {
processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent());
processSessionStateMsgs(sessionInfo, msg.getSessionEvent());
}
if (msg.hasSubscribeToAttributes()) {
processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToAttributes());
processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToAttributes());
}
if (msg.hasSubscribeToRPC()) {
processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToRPC());
processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToRPC());
}
if (msg.hasSendPendingRPC()) {
sendPendingRequests(context, getSessionId(sessionInfo), sessionInfo);
}
if (msg.hasGetAttributes()) {
handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes());
handleGetAttributesRequest(context, sessionInfo, msg.getGetAttributes());
}
if (msg.hasToDeviceRPCCallResponse()) {
processRpcResponses(context, msg.getSessionInfo(), msg.getToDeviceRPCCallResponse());
processRpcResponses(context, sessionInfo, msg.getToDeviceRPCCallResponse());
}
if (msg.hasSubscriptionInfo()) {
handleSessionActivity(context, msg.getSessionInfo(), msg.getSubscriptionInfo());
handleSessionActivity(context, sessionInfo, msg.getSubscriptionInfo());
}
if (msg.hasClaimDevice()) {
handleClaimDeviceMsg(context, msg.getSessionInfo(), msg.getClaimDevice());
handleClaimDeviceMsg(context, sessionInfo, msg.getClaimDevice());
}
if (msg.hasPersistedRpcResponseMsg()) {
processPersistedRpcResponses(context, sessionInfo, msg.getPersistedRpcResponseMsg());
}
callback.onSuccess();
}
private void handleClaimDeviceMsg(TbActorCtx context, SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg) {
private void handleClaimDeviceMsg(TbActorCtx context, SessionInfoProto sessionInfo, ClaimDeviceMsg msg) {
DeviceId deviceId = new DeviceId(new UUID(msg.getDeviceIdMSB(), msg.getDeviceIdLSB()));
systemContext.getClaimDevicesService().registerClaimingInfo(tenantId, deviceId, msg.getSecretKey(), msg.getDurationMs());
}
@ -442,11 +493,22 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
if (success) {
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
responseMsg.getPayload(), null));
if (requestMd.getMsg().getMsg().isPersisted()) {
systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.SUCCESSFUL, JacksonUtil.toJsonNode(responseMsg.getPayload()));
}
} else {
log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
if (requestMd.getMsg().getMsg().isPersisted()) {
systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.FAILED, JacksonUtil.toJsonNode(responseMsg.getPayload()));
}
}
}
private void processPersistedRpcResponses(TbActorCtx context, SessionInfoProto sessionInfo, ToDevicePersistedRpcResponseMsg responseMsg) {
UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB());
systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), RpcStatus.valueOf(responseMsg.getStatus()), null);
}
private void processSubscriptionCommands(TbActorCtx context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {
UUID sessionId = getSessionId(sessionInfo);
if (subscribeCmd.getUnsubscribe()) {
@ -565,7 +627,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
void notifyTransportAboutProfileUpdate(UUID sessionId, SessionInfoMetaData sessionMd, DeviceCredentials deviceCredentials) {
log.info("2) LwM2Mtype: ");
TransportProtos.ToTransportUpdateCredentialsProto.Builder notification = TransportProtos.ToTransportUpdateCredentialsProto.newBuilder();
ToTransportUpdateCredentialsProto.Builder notification = ToTransportUpdateCredentialsProto.newBuilder();
notification.addCredentialsId(deviceCredentials.getCredentialsId());
notification.addCredentialsValue(deviceCredentials.getCredentialsValue());
ToTransportMsg msg = ToTransportMsg.newBuilder()
@ -640,7 +702,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
ListenableFuture<EdgeEvent> future = systemContext.getEdgeEventService().saveAsync(edgeEvent);
Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
@Override
public void onSuccess( EdgeEvent result) {
public void onSuccess(EdgeEvent result) {
systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId);
}
@ -756,8 +818,26 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
.addAllSessions(sessionsList).build().toByteArray());
}
void initSessionTimeout(TbActorCtx ctx) {
void init(TbActorCtx ctx) {
schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(), systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout());
PageLink pageLink = new PageLink(1024);
PageData<Rpc> pageData;
do {
pageData = systemContext.getTbRpcService().findAllByDeviceIdAndStatus(tenantId, deviceId, RpcStatus.QUEUED, pageLink);
pageData.getData().forEach(rpc -> {
ToDeviceRpcRequest msg = JacksonUtil.convertValue(rpc.getRequest(), ToDeviceRpcRequest.class);
long timeout = rpc.getExpirationTime() - System.currentTimeMillis();
if (timeout <= 0) {
rpc.setStatus(RpcStatus.TIMEOUT);
systemContext.getTbRpcService().save(tenantId, rpc);
} else {
registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, creteToDeviceRpcRequestMsg(msg), timeout);
}
});
if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink();
}
} while (pageData.hasNext());
}
void checkSessionsTimeout() {

View File

@ -69,6 +69,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.OtaPackageId;
import org.thingsboard.server.common.data.id.RpcId;
import org.thingsboard.server.common.data.id.TbResourceId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
@ -83,6 +84,7 @@ import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.plugin.ComponentDescriptor;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.data.rule.RuleNode;
@ -106,6 +108,7 @@ import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.oauth2.OAuth2ConfigTemplateService;
import org.thingsboard.server.dao.oauth2.OAuth2Service;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.rpc.RpcService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.tenant.TenantProfileService;
@ -245,6 +248,9 @@ public abstract class BaseController {
@Autowired
protected OtaPackageStateService otaPackageStateService;
@Autowired
protected RpcService rpcService;
@Autowired
protected TbQueueProducerProvider producerProvider;
@ -786,6 +792,18 @@ public abstract class BaseController {
}
}
Rpc checkRpcId(RpcId rpcId, Operation operation) throws ThingsboardException {
try {
validateId(rpcId, "Incorrect rpcId " + rpcId);
Rpc rpc = rpcService.findById(getCurrentUser().getTenantId(), rpcId);
checkNotNull(rpc);
accessControlService.checkPermission(getCurrentUser(), Resource.RPC, operation, rpcId, rpc);
return rpc;
} catch (Exception e) {
throw handleException(e, false);
}
}
@SuppressWarnings("unchecked")
protected <I extends EntityId> I emptyId(EntityType entityType) {
return (I) EntityIdFactory.getByTypeAndUuid(entityType, ModelConstants.NULL_UUID);

View File

@ -29,6 +29,7 @@ import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
@ -38,8 +39,13 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RpcId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.queue.util.TbCoreComponent;
@ -93,6 +99,52 @@ public class RpcController extends BaseController {
return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody);
}
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/persisted/{rpcId}", method = RequestMethod.GET)
@ResponseBody
public Rpc getPersistedRpc(@PathVariable("rpcId") String strRpc) throws ThingsboardException {
checkParameter("RpcId", strRpc);
try {
RpcId rpcId = new RpcId(UUID.fromString(strRpc));
return checkRpcId(rpcId, Operation.READ);
} catch (Exception e) {
throw handleException(e);
}
}
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/persisted/{deviceId}", method = RequestMethod.GET)
@ResponseBody
public PageData<Rpc> getPersistedRpcByDevice(@PathVariable("deviceId") String strDeviceId,
@RequestParam int pageSize,
@RequestParam int page,
@RequestParam RpcStatus rpcStatus,
@RequestParam(required = false) String textSearch,
@RequestParam(required = false) String sortProperty,
@RequestParam(required = false) String sortOrder) throws ThingsboardException {
checkParameter("DeviceId", strDeviceId);
try {
TenantId tenantId = getCurrentUser().getTenantId();
PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder);
DeviceId deviceId = new DeviceId(UUID.fromString(strDeviceId));
return checkNotNull(rpcService.findAllByDeviceIdAndStatus(tenantId, deviceId, rpcStatus, pageLink));
} catch (Exception e) {
throw handleException(e);
}
}
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
@RequestMapping(value = "/persisted/{rpcId}", method = RequestMethod.DELETE)
@ResponseBody
public void deleteResource(@PathVariable("rpcId") String strRpc) throws ThingsboardException {
checkParameter("RpcId", strRpc);
try {
rpcService.deleteRpc(getTenantId(), new RpcId(UUID.fromString(strRpc)));
} catch (Exception e) {
throw handleException(e);
}
}
private DeferredResult<ResponseEntity> handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody) throws ThingsboardException {
try {
JsonNode rpcRequestBody = jsonMapper.readTree(requestBody);
@ -103,6 +155,7 @@ public class RpcController extends BaseController {
long timeout = rpcRequestBody.has("timeout") ? rpcRequestBody.get("timeout").asLong() : defaultTimeout;
long expTime = System.currentTimeMillis() + Math.max(minTimeout, timeout);
UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID();
boolean persisted = rpcRequestBody.has("persisted") && rpcRequestBody.get("persisted").asBoolean();
accessValidator.validate(currentUser, Operation.RPC_CALL, deviceId, new HttpValidationCallback(response, new FutureCallback<DeferredResult<ResponseEntity>>() {
@Override
public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
@ -111,7 +164,8 @@ public class RpcController extends BaseController {
deviceId,
oneWay,
expTime,
body
body,
persisted
);
deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse), currentUser);
}

View File

@ -157,6 +157,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
metaData.putValue("originServiceId", serviceId);
metaData.putValue("expirationTime", Long.toString(msg.getExpirationTime()));
metaData.putValue("oneway", Boolean.toString(msg.isOneway()));
metaData.putValue("persisted", Boolean.toString(msg.isPersisted()));
Device device = deviceService.findDeviceById(msg.getTenantId(), msg.getDeviceId());
if (device != null) {

View File

@ -100,7 +100,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
@Override
public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(),
src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()));
src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted());
forwardRpcRequestToDeviceActor(request, response -> {
if (src.isRestApiCall()) {
sendRpcResponseToTbCore(src.getOriginServiceId(), response);

View File

@ -0,0 +1,77 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.rpc;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.RpcId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.rpc.RpcService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.queue.TbClusterService;
@TbCoreComponent
@Service
@RequiredArgsConstructor
@Slf4j
public class TbRpcService {
private final RpcService rpcService;
private final TbClusterService tbClusterService;
public Rpc save(TenantId tenantId, Rpc rpc) {
Rpc saved = rpcService.save(rpc);
pushRpcMsgToRuleEngine(tenantId, saved);
return saved;
}
public void save(TenantId tenantId, RpcId rpcId, RpcStatus newStatus, JsonNode response) {
Rpc foundRpc = rpcService.findById(tenantId, rpcId);
if (foundRpc != null) {
foundRpc.setStatus(newStatus);
if (response != null) {
foundRpc.setResponse(response);
}
Rpc saved = rpcService.save(foundRpc);
pushRpcMsgToRuleEngine(tenantId, saved);
} else {
log.warn("[{}] Failed to update RPC status because RPC was already deleted", rpcId);
}
}
private void pushRpcMsgToRuleEngine(TenantId tenantId, Rpc rpc) {
TbMsg msg = TbMsg.newMsg("RPC_" + rpc.getStatus().name(), rpc.getDeviceId(), TbMsgMetaData.EMPTY, JacksonUtil.toString(rpc));
tbClusterService.pushMsgToRuleEngine(tenantId, rpc.getId(), msg, null);
}
public Rpc findRpcById(TenantId tenantId, RpcId rpcId) {
return rpcService.findById(tenantId, rpcId);
}
public PageData<Rpc> findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) {
return rpcService.findAllByDeviceIdAndStatus(tenantId, deviceId, rpcStatus, pageLink);
}
}

View File

@ -47,11 +47,13 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.OtaPackageId;
import org.thingsboard.server.common.data.id.RpcId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TbResourceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.controller.HttpValidationCallback;
@ -65,6 +67,7 @@ import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.ota.OtaPackageService;
import org.thingsboard.server.dao.resource.ResourceService;
import org.thingsboard.server.dao.rpc.RpcService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
@ -137,6 +140,9 @@ public class AccessValidator {
@Autowired
protected OtaPackageService otaPackageService;
@Autowired
protected RpcService rpcService;
private ExecutorService executor;
@PostConstruct
@ -235,6 +241,9 @@ public class AccessValidator {
case OTA_PACKAGE:
validateOtaPackage(currentUser, operation, entityId, callback);
return;
case RPC:
validateRpc(currentUser, operation, entityId, callback);
return;
default:
//TODO: add support of other entities
throw new IllegalStateException("Not Implemented!");
@ -261,6 +270,22 @@ public class AccessValidator {
}
}
private void validateRpc(final SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback<ValidationResult> callback) {
ListenableFuture<Rpc> rpcFurure = rpcService.findRpcByIdAsync(currentUser.getTenantId(), new RpcId(entityId.getId()));
Futures.addCallback(rpcFurure, getCallback(callback, rpc -> {
if (rpc == null) {
return ValidationResult.entityNotFound("Rpc with requested id wasn't found!");
} else {
try {
accessControlService.checkPermission(currentUser, Resource.RPC, operation, entityId, rpc);
} catch (ThingsboardException e) {
return ValidationResult.accessDenied(e.getMessage());
}
return ValidationResult.ok(rpc);
}
}), executor);
}
private void validateDeviceProfile(final SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback<ValidationResult> callback) {
if (currentUser.isSystemAdmin()) {
callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));

View File

@ -41,6 +41,7 @@ public class CustomerUserPermissions extends AbstractPermissions {
put(Resource.WIDGETS_BUNDLE, widgetsPermissionChecker);
put(Resource.WIDGET_TYPE, widgetsPermissionChecker);
put(Resource.EDGE, customerEntityPermissionChecker);
put(Resource.RPC, rpcPermissionChecker);
}
private static final PermissionChecker customerEntityPermissionChecker =
@ -138,4 +139,22 @@ public class CustomerUserPermissions extends AbstractPermissions {
}
};
private static final PermissionChecker rpcPermissionChecker = new PermissionChecker.GenericPermissionChecker(Operation.READ) {
@Override
@SuppressWarnings("unchecked")
public boolean hasPermission(SecurityUser user, Operation operation, EntityId entityId, HasTenantId entity) {
if (!super.hasPermission(user, operation, entityId, entity)) {
return false;
}
if (entity.getTenantId() == null || entity.getTenantId().isNullUid()) {
return true;
}
if (!user.getTenantId().equals(entity.getTenantId())) {
return false;
}
return true;
}
};
}

View File

@ -39,7 +39,8 @@ public enum Resource {
API_USAGE_STATE(EntityType.API_USAGE_STATE),
TB_RESOURCE(EntityType.TB_RESOURCE),
OTA_PACKAGE(EntityType.OTA_PACKAGE),
EDGE(EntityType.EDGE);
EDGE(EntityType.EDGE),
RPC(EntityType.RPC);
private final EntityType entityType;

View File

@ -44,6 +44,7 @@ public class TenantAdminPermissions extends AbstractPermissions {
put(Resource.TB_RESOURCE, tbResourcePermissionChecker);
put(Resource.OTA_PACKAGE, tenantEntityPermissionChecker);
put(Resource.EDGE, tenantEntityPermissionChecker);
put(Resource.RPC, tenantEntityPermissionChecker);
}
public static final PermissionChecker tenantEntityPermissionChecker = new PermissionChecker() {

View File

@ -22,6 +22,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
@ -41,13 +42,13 @@ import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.device.credentials.BasicMqttCredentials;
import org.thingsboard.server.common.data.device.credentials.ProvisionDeviceCredentialsData;
import org.thingsboard.server.common.data.device.profile.ProvisionDeviceProfileCredentials;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.common.data.ota.OtaPackageUtil;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.OtaPackageId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.common.data.ota.OtaPackageUtil;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
@ -108,6 +109,7 @@ import java.util.stream.Collectors;
@Slf4j
@Service
@TbCoreComponent
@RequiredArgsConstructor
public class DefaultTransportApiService implements TransportApiService {
private static final ObjectMapper mapper = new ObjectMapper();
@ -129,28 +131,6 @@ public class DefaultTransportApiService implements TransportApiService {
private final ConcurrentMap<String, ReentrantLock> deviceCreationLocks = new ConcurrentHashMap<>();
public DefaultTransportApiService(TbDeviceProfileCache deviceProfileCache,
TbTenantProfileCache tenantProfileCache, TbApiUsageStateService apiUsageStateService, DeviceService deviceService,
RelationService relationService, DeviceCredentialsService deviceCredentialsService,
DeviceStateService deviceStateService, DbCallbackExecutorService dbCallbackExecutorService,
TbClusterService tbClusterService, DataDecodingEncodingService dataDecodingEncodingService,
DeviceProvisionService deviceProvisionService, TbResourceService resourceService, OtaPackageService otaPackageService, OtaPackageDataCache otaPackageDataCache) {
this.deviceProfileCache = deviceProfileCache;
this.tenantProfileCache = tenantProfileCache;
this.apiUsageStateService = apiUsageStateService;
this.deviceService = deviceService;
this.relationService = relationService;
this.deviceCredentialsService = deviceCredentialsService;
this.deviceStateService = deviceStateService;
this.dbCallbackExecutorService = dbCallbackExecutorService;
this.tbClusterService = tbClusterService;
this.dataDecodingEncodingService = dataDecodingEncodingService;
this.deviceProvisionService = deviceProvisionService;
this.resourceService = resourceService;
this.otaPackageService = otaPackageService;
this.otaPackageDataCache = otaPackageDataCache;
}
@Override
public ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> handle(TbProtoQueueMsg<TransportApiRequestMsg> tbProtoQueueMsg) {
TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue();

View File

@ -0,0 +1,83 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ttl.rpc;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.dao.rpc.RpcDao;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.tenant.TenantDao;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@TbCoreComponent
@Service
@Slf4j
@RequiredArgsConstructor
public class RpcCleanUpService {
@Value("${sql.ttl.rpc.enabled}")
private boolean ttlTaskExecutionEnabled;
private final TenantDao tenantDao;
private final PartitionService partitionService;
private final TbTenantProfileCache tenantProfileCache;
private final RpcDao rpcDao;
@Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.rpc.checking_interval})}", fixedDelayString = "${sql.ttl.rpc.checking_interval}")
public void cleanUp() {
if (ttlTaskExecutionEnabled) {
PageLink tenantsBatchRequest = new PageLink(10_000, 0);
PageData<TenantId> tenantsIds;
do {
tenantsIds = tenantDao.findTenantsIds(tenantsBatchRequest);
for (TenantId tenantId : tenantsIds.getData()) {
if (!partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) {
continue;
}
Optional<DefaultTenantProfileConfiguration> tenantProfileConfiguration = tenantProfileCache.get(tenantId).getProfileConfiguration();
if (tenantProfileConfiguration.isEmpty() || tenantProfileConfiguration.get().getRpcTtlDays() == 0) {
continue;
}
long ttl = TimeUnit.DAYS.toMillis(tenantProfileConfiguration.get().getRpcTtlDays());
long expirationTime = System.currentTimeMillis() - ttl;
long totalRemoved = rpcDao.deleteOutdatedRpcByTenantId(tenantId, expirationTime);
if (totalRemoved > 0) {
log.info("Removed {} outdated rpc(s) for tenant {} older than {}", totalRemoved, tenantId, new Date(expirationTime));
}
}
tenantsBatchRequest = tenantsBatchRequest.nextPageLink();
} while (tenantsIds.hasNext());
}
}
}

View File

@ -276,6 +276,9 @@ sql:
alarms:
checking_interval: "${SQL_ALARMS_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours
removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:3000}" # To delete outdated alarms not all at once but in batches
rpc:
enabled: "${SQL_TTL_RPC_ENABLED:true}"
checking_interval: "${SQL_RPC_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours
# Actor system parameters
actors:

View File

@ -0,0 +1,39 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.rpc;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.RpcId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rpc.RpcStatus;
public interface RpcService {
Rpc save(Rpc rpc);
void deleteRpc(TenantId tenantId, RpcId id);
void deleteAllRpcByTenantId(TenantId tenantId);
Rpc findById(TenantId tenantId, RpcId id);
ListenableFuture<Rpc> findRpcByIdAsync(TenantId tenantId, RpcId id);
PageData<Rpc> findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink);
}

View File

@ -76,6 +76,12 @@ public class DataConstants {
public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE";
public static final String RPC_QUEUED = "RPC_QUEUED";
public static final String RPC_DELIVERED = "RPC_DELIVERED";
public static final String RPC_SUCCESSFUL = "RPC_SUCCESSFUL";
public static final String RPC_TIMEOUT = "RPC_TIMEOUT";
public static final String RPC_FAILED = "RPC_FAILED";
public static final String DEFAULT_SECRET_KEY = "";
public static final String SECRET_KEY_FIELD_NAME = "secretKey";
public static final String DURATION_MS_FIELD_NAME = "durationMs";

View File

@ -19,5 +19,5 @@ package org.thingsboard.server.common.data;
* @author Andrew Shvayka
*/
public enum EntityType {
TENANT, CUSTOMER, USER, DASHBOARD, ASSET, DEVICE, ALARM, RULE_CHAIN, RULE_NODE, ENTITY_VIEW, WIDGETS_BUNDLE, WIDGET_TYPE, TENANT_PROFILE, DEVICE_PROFILE, API_USAGE_STATE, TB_RESOURCE, OTA_PACKAGE, EDGE;
TENANT, CUSTOMER, USER, DASHBOARD, ASSET, DEVICE, ALARM, RULE_CHAIN, RULE_NODE, ENTITY_VIEW, WIDGETS_BUNDLE, WIDGET_TYPE, TENANT_PROFILE, DEVICE_PROFILE, API_USAGE_STATE, TB_RESOURCE, OTA_PACKAGE, EDGE, RPC;
}

View File

@ -75,6 +75,8 @@ public class EntityIdFactory {
return new OtaPackageId(uuid);
case EDGE:
return new EdgeId(uuid);
case RPC:
return new RpcId(uuid);
}
throw new IllegalArgumentException("EntityType " + type + " is not supported!");
}

View File

@ -0,0 +1,39 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.id;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.thingsboard.server.common.data.EntityType;
import java.util.UUID;
public final class RpcId extends UUIDBased implements EntityId {
private static final long serialVersionUID = 1L;
@JsonCreator
public RpcId(@JsonProperty("id") UUID id) {
super(id);
}
@JsonIgnore
@Override
public EntityType getEntityType() {
return EntityType.RPC;
}
}

View File

@ -0,0 +1,54 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.rpc;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.thingsboard.server.common.data.BaseData;
import org.thingsboard.server.common.data.HasTenantId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.RpcId;
import org.thingsboard.server.common.data.id.TenantId;
@Data
@EqualsAndHashCode(callSuper = true)
public class Rpc extends BaseData<RpcId> implements HasTenantId {
private TenantId tenantId;
private DeviceId deviceId;
private long expirationTime;
private JsonNode request;
private JsonNode response;
private RpcStatus status;
public Rpc() {
super();
}
public Rpc(RpcId id) {
super(id);
}
public Rpc(Rpc rpc) {
super(rpc);
this.tenantId = rpc.getTenantId();
this.deviceId = rpc.getDeviceId();
this.expirationTime = rpc.getExpirationTime();
this.request = rpc.getRequest();
this.response = rpc.getResponse();
this.status = rpc.getStatus();
}
}

View File

@ -0,0 +1,20 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.rpc;
public enum RpcStatus {
QUEUED, DELIVERED, SUCCESSFUL, TIMEOUT, FAILED
}

View File

@ -56,6 +56,7 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
private int defaultStorageTtlDays;
private int alarmsTtlDays;
private int rpcTtlDays;
private double warnThreshold;

View File

@ -34,5 +34,6 @@ public class ToDeviceRpcRequest implements Serializable {
private final boolean oneway;
private final long expirationTime;
private final ToDeviceRpcRequestBody body;
private final boolean persisted;
}

View File

@ -318,6 +318,9 @@ message SubscribeToRPCMsg {
SessionType sessionType = 2;
}
message SendPendingRPCMsg {
}
message ToDeviceRpcRequestMsg {
int32 requestId = 1;
string methodName = 2;
@ -325,6 +328,8 @@ message ToDeviceRpcRequestMsg {
int64 expirationTime = 4;
int64 requestIdMSB = 5;
int64 requestIdLSB = 6;
bool oneway = 7;
bool persisted = 8;
}
message ToDeviceRpcResponseMsg {
@ -332,6 +337,13 @@ message ToDeviceRpcResponseMsg {
string payload = 2;
}
message ToDevicePersistedRpcResponseMsg {
int32 requestId = 1;
int64 requestIdMSB = 2;
int64 requestIdLSB = 3;
string status = 4;
}
message ToServerRpcRequestMsg {
int32 requestId = 1;
string methodName = 2;
@ -435,6 +447,8 @@ message TransportToDeviceActorMsg {
SubscriptionInfoProto subscriptionInfo = 7;
ClaimDeviceMsg claimDevice = 8;
ProvisionDeviceRequestMsg provisionDevice = 9;
ToDevicePersistedRpcResponseMsg persistedRpcResponseMsg = 10;
SendPendingRPCMsg sendPendingRPC = 11;
}
message TransportToRuleEngineMsg {

View File

@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportC
import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadConfiguration;
import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
import org.thingsboard.server.common.msg.session.FeatureType;
import org.thingsboard.server.common.msg.session.SessionMsgType;
@ -332,14 +333,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
break;
case TO_SERVER_RPC_REQUEST:
transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor,
transportConfigurationContainer.getRpcRequestDynamicMessageBuilder()), timeout);
transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), sessionInfo), timeout);
transportService.process(sessionInfo,
coapTransportAdaptor.convertToServerRpcRequest(sessionId, request),
new CoapNoOpCallback(exchange));
break;
case GET_ATTRIBUTES_REQUEST:
transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor,
transportConfigurationContainer.getRpcRequestDynamicMessageBuilder()), timeout);
transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), sessionInfo), timeout);
transportService.process(sessionInfo,
coapTransportAdaptor.convertToGetAttributes(sessionId, request),
new CoapNoOpCallback(exchange));
@ -362,12 +363,12 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) {
tokenToSessionInfoMap.putIfAbsent(token, sessionInfo);
transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder));
transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo));
transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
}
private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) {
return new CoapSessionListener(this, exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder);
private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) {
return new CoapSessionListener(this, exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo);
}
private String getTokenFromRequest(Request request) {
@ -455,12 +456,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
private final CoapExchange exchange;
private final CoapTransportAdaptor coapTransportAdaptor;
private final DynamicMessage.Builder rpcRequestDynamicMessageBuilder;
private final TransportProtos.SessionInfoProto sessionInfo;
CoapSessionListener(CoapTransportResource coapTransportResource, CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) {
CoapSessionListener(CoapTransportResource coapTransportResource, CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) {
this.coapTransportResource = coapTransportResource;
this.exchange = exchange;
this.coapTransportAdaptor = coapTransportAdaptor;
this.rpcRequestDynamicMessageBuilder = rpcRequestDynamicMessageBuilder;
this.sessionInfo = sessionInfo;
}
@Override
@ -503,11 +506,31 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
@Override
public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg msg) {
boolean successful;
try {
exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder));
successful = true;
} catch (AdaptorException e) {
log.trace("Failed to reply due to error", e);
exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
successful = false;
}
if (msg.getPersisted()) {
RpcStatus status;
if (!successful) {
status = RpcStatus.FAILED;
} else if (msg.getOneway()) {
status = RpcStatus.SUCCESSFUL;
} else {
status = RpcStatus.DELIVERED;
}
TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
.setRequestId(msg.getRequestId())
.setRequestIdLSB(msg.getRequestIdLSB())
.setRequestIdMSB(msg.getRequestIdMSB())
.setStatus(status.name())
.build();
coapTransportResource.transportService.process(sessionInfo, responseMsg, TransportServiceCallback.EMPTY);
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.transport.http;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
@ -34,9 +35,10 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.common.data.TbTransportService;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportContext;
import org.thingsboard.server.common.transport.TransportService;
@ -95,7 +97,9 @@ public class DeviceApiController implements TbTransportService {
request.addAllSharedAttributeNames(sharedKeySet);
}
TransportService transportService = transportContext.getTransportService();
transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout());
transportService.registerSyncSession(sessionInfo,
new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo),
transportContext.getDefaultTimeout());
transportService.process(sessionInfo, request.build(), new SessionCloseOnErrorCallback(transportService, sessionInfo));
}));
return responseWriter;
@ -151,7 +155,8 @@ public class DeviceApiController implements TbTransportService {
transportContext.getTransportService().process(DeviceTransportType.DEFAULT, ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
TransportService transportService = transportContext.getTransportService();
transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter),
transportService.registerSyncSession(sessionInfo,
new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo),
timeout == 0 ? transportContext.getDefaultTimeout() : timeout);
transportService.process(sessionInfo, SubscribeToRPCMsg.getDefaultInstance(),
new SessionCloseOnErrorCallback(transportService, sessionInfo));
@ -181,7 +186,9 @@ public class DeviceApiController implements TbTransportService {
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
JsonObject request = new JsonParser().parse(json).getAsJsonObject();
TransportService transportService = transportContext.getTransportService();
transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout());
transportService.registerSyncSession(sessionInfo,
new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo),
transportContext.getDefaultTimeout());
transportService.process(sessionInfo, ToServerRpcRequestMsg.newBuilder().setRequestId(0)
.setMethodName(request.get("method").getAsString())
.setParams(request.get("params").toString()).build(),
@ -198,7 +205,8 @@ public class DeviceApiController implements TbTransportService {
transportContext.getTransportService().process(DeviceTransportType.DEFAULT, ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
TransportService transportService = transportContext.getTransportService();
transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter),
transportService.registerSyncSession(sessionInfo,
new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo),
timeout == 0 ? transportContext.getDefaultTimeout() : timeout);
transportService.process(sessionInfo, SubscribeToAttributeUpdatesMsg.getDefaultInstance(),
new SessionCloseOnErrorCallback(transportService, sessionInfo));
@ -372,13 +380,12 @@ public class DeviceApiController implements TbTransportService {
}
}
@RequiredArgsConstructor
private static class HttpSessionListener implements SessionMsgListener {
private final DeferredResult<ResponseEntity> responseWriter;
HttpSessionListener(DeferredResult<ResponseEntity> responseWriter) {
this.responseWriter = responseWriter;
}
private final TransportService transportService;
private final SessionInfoProto sessionInfo;
@Override
public void onGetAttributesResponse(GetAttributeResponseMsg msg) {
@ -399,6 +406,21 @@ public class DeviceApiController implements TbTransportService {
@Override
public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg msg) {
responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK));
if (msg.getPersisted()) {
RpcStatus status;
if (msg.getOneway()) {
status = RpcStatus.SUCCESSFUL;
} else {
status = RpcStatus.DELIVERED;
}
TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
.setRequestId(msg.getRequestId())
.setRequestIdLSB(msg.getRequestIdLSB())
.setRequestIdMSB(msg.getRequestIdMSB())
.setStatus(status.name())
.build();
transportService.process(sessionInfo, responseMsg, TransportServiceCallback.EMPTY);
}
}
@Override

View File

@ -155,7 +155,7 @@ public class LwM2MBootstrapSecurityStore implements BootstrapSecurityStore {
LwM2MServerBootstrap profileLwm2mServer = JacksonUtil.fromString(JacksonUtil.toString(bootstrapObject.getLwm2mServer()), LwM2MServerBootstrap.class);
UUID sessionUUiD = UUID.randomUUID();
TransportProtos.SessionInfoProto sessionInfo = helper.getValidateSessionInfo(store.getMsg(), sessionUUiD.getMostSignificantBits(), sessionUUiD.getLeastSignificantBits());
context.getTransportService().registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(null, null, null, sessionInfo));
context.getTransportService().registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(null, null, null, sessionInfo, context.getTransportService()));
if (this.getValidatedSecurityMode(lwM2MBootstrapConfig.bootstrapServer, profileServerBootstrap, lwM2MBootstrapConfig.lwm2mServer, profileLwm2mServer)) {
lwM2MBootstrapConfig.bootstrapServer = new LwM2MServerBootstrap(lwM2MBootstrapConfig.bootstrapServer, profileServerBootstrap);
lwM2MBootstrapConfig.lwm2mServer = new LwM2MServerBootstrap(lwM2MBootstrapConfig.lwm2mServer, profileLwm2mServer);

View File

@ -23,7 +23,10 @@ import org.jetbrains.annotations.NotNull;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.ResourceType;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
@ -45,6 +48,7 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s
private final LwM2MAttributesService attributesService;
private final LwM2MRpcRequestHandler rpcHandler;
private final TransportProtos.SessionInfoProto sessionInfo;
private final TransportService transportService;
@Override
public void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse) {
@ -78,7 +82,22 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s
@Override
public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) {
this.rpcHandler.onToDeviceRpcRequest(toDeviceRequest,this.sessionInfo);
this.rpcHandler.onToDeviceRpcRequest(toDeviceRequest, this.sessionInfo);
if (toDeviceRequest.getPersisted()) {
RpcStatus status;
if (toDeviceRequest.getOneway()) {
status = RpcStatus.SUCCESSFUL;
} else {
status = RpcStatus.DELIVERED;
}
TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
.setRequestId(toDeviceRequest.getRequestId())
.setRequestIdLSB(toDeviceRequest.getRequestIdLSB())
.setRequestIdMSB(toDeviceRequest.getRequestIdMSB())
.setStatus(status.name())
.build();
transportService.process(sessionInfo, responseMsg, TransportServiceCallback.EMPTY);
}
}
@Override

View File

@ -212,7 +212,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
}
logService.log(lwM2MClient, LOG_LWM2M_INFO + ": Client registered with registration id: " + registration.getId());
SessionInfoProto sessionInfo = lwM2MClient.getSession();
transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo));
transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo, transportService));
log.warn("40) sessionId [{}] Registering rpc subscription after Registration client", new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()));
TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder()
.setSessionInfo(sessionInfo)
@ -888,7 +888,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
*/
private void reportActivityAndRegister(SessionInfoProto sessionInfo) {
if (sessionInfo != null && transportService.reportActivity(sessionInfo) == null) {
transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo));
transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo, transportService));
this.reportActivitySubscription(sessionInfo);
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.gson.JsonParseException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
@ -47,8 +48,9 @@ import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.common.data.id.OtaPackageId;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.msg.EncryptionUtil;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.common.transport.SessionMsgListener;
@ -813,7 +815,31 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
log.trace("[{}] Received RPC command to device", sessionId);
try {
deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest)
.ifPresent(payload -> {
ChannelFuture channelFuture = deviceSessionCtx.getChannel().writeAndFlush(payload);
if (rpcRequest.getPersisted()) {
channelFuture.addListener(future -> {
RpcStatus status;
Throwable t = future.cause();
if (t != null) {
log.error("Failed delivering RPC command to device!", t);
status = RpcStatus.FAILED;
} else if (rpcRequest.getOneway()) {
status = RpcStatus.SUCCESSFUL;
} else {
status = RpcStatus.DELIVERED;
}
TransportProtos.ToDevicePersistedRpcResponseMsg msg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
.setRequestId(rpcRequest.getRequestId())
.setRequestIdLSB(rpcRequest.getRequestIdLSB())
.setRequestIdMSB(rpcRequest.getRequestIdMSB())
.setStatus(status.name())
.build();
transportService.process(deviceSessionCtx.getSessionInfo(), msg, TransportServiceCallback.EMPTY);
});
}
});
} catch (Exception e) {
log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e);
}

View File

@ -15,9 +15,13 @@
*/
package org.thingsboard.server.transport.mqtt.session;
import io.netty.channel.ChannelFuture;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
@ -32,9 +36,11 @@ import java.util.concurrent.ConcurrentMap;
public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener {
private final GatewaySessionHandler parent;
private final TransportService transportService;
public GatewayDeviceSessionCtx(GatewaySessionHandler parent, TransportDeviceInfo deviceInfo,
DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap,
TransportService transportService) {
super(UUID.randomUUID(), mqttQoSMap);
this.parent = parent;
setSessionInfo(SessionInfoProto.newBuilder()
@ -56,6 +62,7 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
.build());
setDeviceInfo(deviceInfo);
setDeviceProfile(deviceProfile);
this.transportService = transportService;
}
@Override
@ -89,7 +96,32 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
@Override
public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg request) {
try {
parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(parent::writeAndFlush);
parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(
payload -> {
ChannelFuture channelFuture = parent.writeAndFlush(payload);
if (request.getPersisted()) {
channelFuture.addListener(future -> {
RpcStatus status;
Throwable t = future.cause();
if (t != null) {
log.error("Failed delivering RPC command to device!", t);
status = RpcStatus.FAILED;
} else if (request.getOneway()) {
status = RpcStatus.SUCCESSFUL;
} else {
status = RpcStatus.DELIVERED;
}
TransportProtos.ToDevicePersistedRpcResponseMsg msg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
.setRequestId(request.getRequestId())
.setRequestIdLSB(request.getRequestIdLSB())
.setRequestIdMSB(request.getRequestIdMSB())
.setStatus(status.name())
.build();
transportService.process(getSessionInfo(), msg, TransportServiceCallback.EMPTY);
});
}
}
);
} catch (Exception e) {
log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
}

View File

@ -28,6 +28,7 @@ import com.google.gson.JsonSyntaxException;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ProtocolStringList;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
@ -188,8 +189,8 @@ public class GatewaySessionHandler {
}
}
void writeAndFlush(MqttMessage mqttMessage) {
channel.writeAndFlush(mqttMessage);
ChannelFuture writeAndFlush(MqttMessage mqttMessage) {
return channel.writeAndFlush(mqttMessage);
}
int nextMsgId() {
@ -251,7 +252,7 @@ public class GatewaySessionHandler {
new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse>() {
@Override
public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap);
GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();

View File

@ -26,7 +26,9 @@ import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
@ -139,6 +141,21 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
@Override
public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) {
snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest);
if (toDeviceRequest.getPersisted()) {
RpcStatus status;
if (toDeviceRequest.getOneway()) {
status = RpcStatus.SUCCESSFUL;
} else {
status = RpcStatus.DELIVERED;
}
TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
.setRequestId(toDeviceRequest.getRequestId())
.setRequestIdLSB(toDeviceRequest.getRequestIdLSB())
.setRequestIdMSB(toDeviceRequest.getRequestIdMSB())
.setStatus(status.name())
.build();
snmpTransportContext.getTransportService().process(getSessionInfo(), responseMsg, TransportServiceCallback.EMPTY);
}
}
@Override

View File

@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.common.transport.service.SessionMetaData;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
@ -109,6 +110,8 @@ public interface TransportService {
void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDevicePersistedRpcResponseMsg msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback<Void> callback);

View File

@ -557,6 +557,15 @@ public class DefaultTransportService implements TransportService {
}
}
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDevicePersistedRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPersistedRpcResponseMsg(msg).build(),
new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback));
}
}
private void processTimeout(String requestId) {
RpcRequestMetadata data = toServerRpcPendingMap.remove(requestId);
if (data != null) {

View File

@ -506,6 +506,17 @@ public class ModelConstants {
public static final String OTA_PACKAGE_DATA_SIZE_COLUMN = "data_size";
public static final String OTA_PACKAGE_ADDITIONAL_INFO_COLUMN = ADDITIONAL_INFO_PROPERTY;
/**
* Persisted RPC constants.
*/
public static final String RPC_TABLE_NAME = "rpc";
public static final String RPC_TENANT_ID_COLUMN = TENANT_ID_COLUMN;
public static final String RPC_DEVICE_ID = "device_id";
public static final String RPC_EXPIRATION_TIME = "expiration_time";
public static final String RPC_REQUEST = "request";
public static final String RPC_RESPONSE = "response";
public static final String RPC_STATUS = "status";
/**
* Edge constants.
*/

View File

@ -0,0 +1,102 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.model.sql;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.hibernate.annotations.Type;
import org.hibernate.annotations.TypeDef;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.RpcId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.dao.model.BaseEntity;
import org.thingsboard.server.dao.model.BaseSqlEntity;
import org.thingsboard.server.dao.util.mapping.JsonStringType;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.Table;
import java.util.UUID;
import static org.thingsboard.server.dao.model.ModelConstants.RPC_DEVICE_ID;
import static org.thingsboard.server.dao.model.ModelConstants.RPC_EXPIRATION_TIME;
import static org.thingsboard.server.dao.model.ModelConstants.RPC_REQUEST;
import static org.thingsboard.server.dao.model.ModelConstants.RPC_RESPONSE;
import static org.thingsboard.server.dao.model.ModelConstants.RPC_STATUS;
import static org.thingsboard.server.dao.model.ModelConstants.RPC_TABLE_NAME;
import static org.thingsboard.server.dao.model.ModelConstants.RPC_TENANT_ID_COLUMN;
@Data
@EqualsAndHashCode(callSuper = true)
@Entity
@TypeDef(name = "json", typeClass = JsonStringType.class)
@Table(name = RPC_TABLE_NAME)
public class RpcEntity extends BaseSqlEntity<Rpc> implements BaseEntity<Rpc> {
@Column(name = RPC_TENANT_ID_COLUMN)
private UUID tenantId;
@Column(name = RPC_DEVICE_ID)
private UUID deviceId;
@Column(name = RPC_EXPIRATION_TIME)
private long expirationTime;
@Type(type = "json")
@Column(name = RPC_REQUEST)
private JsonNode request;
@Type(type = "json")
@Column(name = RPC_RESPONSE)
private JsonNode response;
@Enumerated(EnumType.STRING)
@Column(name = RPC_STATUS)
private RpcStatus status;
public RpcEntity() {
super();
}
public RpcEntity(Rpc rpc) {
this.setUuid(rpc.getUuidId());
this.createdTime = rpc.getCreatedTime();
this.tenantId = rpc.getTenantId().getId();
this.deviceId = rpc.getDeviceId().getId();
this.expirationTime = rpc.getExpirationTime();
this.request = rpc.getRequest();
this.response = rpc.getResponse();
this.status = rpc.getStatus();
}
@Override
public Rpc toData() {
Rpc rpc = new Rpc(new RpcId(id));
rpc.setCreatedTime(createdTime);
rpc.setTenantId(new TenantId(tenantId));
rpc.setDeviceId(new DeviceId(deviceId));
rpc.setExpirationTime(expirationTime);
rpc.setRequest(request);
rpc.setResponse(response);
rpc.setStatus(status);
return rpc;
}
}

View File

@ -0,0 +1,100 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.rpc;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.RpcId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.dao.service.PaginatedRemover;
import static org.thingsboard.server.dao.service.Validator.validateId;
import static org.thingsboard.server.dao.service.Validator.validatePageLink;
@Service
@Slf4j
@RequiredArgsConstructor
public class BaseRpcService implements RpcService {
public static final String INCORRECT_TENANT_ID = "Incorrect tenantId ";
public static final String INCORRECT_RPC_ID = "Incorrect rpcId ";
private final RpcDao rpcDao;
@Override
public Rpc save(Rpc rpc) {
log.trace("Executing save, [{}]", rpc);
return rpcDao.save(rpc.getTenantId(), rpc);
}
@Override
public void deleteRpc(TenantId tenantId, RpcId rpcId) {
log.trace("Executing deleteRpc, tenantId [{}], rpcId [{}]", tenantId, rpcId);
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
validateId(rpcId, INCORRECT_RPC_ID + rpcId);
rpcDao.removeById(tenantId, rpcId.getId());
}
@Override
public void deleteAllRpcByTenantId(TenantId tenantId) {
log.trace("Executing deleteAllRpcByTenantId, tenantId [{}]", tenantId);
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
tenantRpcRemover.removeEntities(tenantId, tenantId);
}
@Override
public Rpc findById(TenantId tenantId, RpcId rpcId) {
log.trace("Executing findById, tenantId [{}], rpcId [{}]", tenantId, rpcId);
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
validateId(rpcId, INCORRECT_RPC_ID + rpcId);
return rpcDao.findById(tenantId, rpcId.getId());
}
@Override
public ListenableFuture<Rpc> findRpcByIdAsync(TenantId tenantId, RpcId rpcId) {
log.trace("Executing findRpcByIdAsync, tenantId [{}], rpcId: [{}]", tenantId, rpcId);
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
validateId(rpcId, INCORRECT_RPC_ID + rpcId);
return rpcDao.findByIdAsync(tenantId, rpcId.getId());
}
@Override
public PageData<Rpc> findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) {
log.trace("Executing findAllByDeviceIdAndStatus, tenantId [{}], deviceId [{}], rpcStatus [{}], pageLink [{}]", tenantId, deviceId, rpcStatus, pageLink);
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
validatePageLink(pageLink);
return rpcDao.findAllByDeviceId(tenantId, deviceId, rpcStatus, pageLink);
}
private PaginatedRemover<TenantId, Rpc> tenantRpcRemover =
new PaginatedRemover<>() {
@Override
protected PageData<Rpc> findEntities(TenantId tenantId, TenantId id, PageLink pageLink) {
return rpcDao.findAllRpcByTenantId(id, pageLink);
}
@Override
protected void removeEntity(TenantId tenantId, Rpc entity) {
deleteRpc(tenantId, entity.getId());
}
};
}

View File

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.rpc;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.dao.Dao;
public interface RpcDao extends Dao<Rpc> {
PageData<Rpc> findAllByDeviceId(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink);
PageData<Rpc> findAllRpcByTenantId(TenantId tenantId, PageLink pageLink);
Long deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime);
}

View File

@ -0,0 +1,66 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sql.rpc;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sql.RpcEntity;
import org.thingsboard.server.dao.rpc.RpcDao;
import org.thingsboard.server.dao.sql.JpaAbstractDao;
import java.util.UUID;
@Slf4j
@Component
@AllArgsConstructor
public class JpaRpcDao extends JpaAbstractDao<RpcEntity, Rpc> implements RpcDao {
private final RpcRepository rpcRepository;
@Override
protected Class<RpcEntity> getEntityClass() {
return RpcEntity.class;
}
@Override
protected CrudRepository<RpcEntity, UUID> getCrudRepository() {
return rpcRepository;
}
@Override
public PageData<Rpc> findAllByDeviceId(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) {
return DaoUtil.toPageData(rpcRepository.findAllByTenantIdAndDeviceIdAndStatus(tenantId.getId(), deviceId.getId(), rpcStatus, DaoUtil.toPageable(pageLink)));
}
@Override
public PageData<Rpc> findAllRpcByTenantId(TenantId tenantId, PageLink pageLink) {
return DaoUtil.toPageData(rpcRepository.findAllByTenantId(tenantId.getId(), DaoUtil.toPageable(pageLink)));
}
@Override
public Long deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime) {
return rpcRepository.deleteOutdatedRpcByTenantId(tenantId.getId(), expirationTime);
}
}

View File

@ -0,0 +1,36 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sql.rpc;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.dao.model.sql.RpcEntity;
import java.util.UUID;
public interface RpcRepository extends CrudRepository<RpcEntity, UUID> {
Page<RpcEntity> findAllByTenantIdAndDeviceIdAndStatus(UUID tenantId, UUID deviceId, RpcStatus status, Pageable pageable);
Page<RpcEntity> findAllByTenantId(UUID tenantId, Pageable pageable);
@Query(value = "WITH deleted AS (DELETE FROM rpc WHERE (tenant_id = :tenantId AND created_time < :expirationTime) IS TRUE RETURNING *) SELECT count(*) FROM deleted",
nativeQuery = true)
Long deleteOutdatedRpcByTenantId(@Param("tenantId") UUID tenantId, @Param("expirationTime") Long expirationTime);
}

View File

@ -37,6 +37,7 @@ import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.ota.OtaPackageService;
import org.thingsboard.server.dao.resource.ResourceService;
import org.thingsboard.server.dao.rpc.RpcService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.service.PaginatedRemover;
@ -96,6 +97,9 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe
@Autowired
private OtaPackageService otaPackageService;
@Autowired
private RpcService rpcService;
@Override
public Tenant findTenantById(TenantId tenantId) {
log.trace("Executing findTenantById [{}]", tenantId);
@ -151,6 +155,7 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe
apiUsageStateService.deleteApiUsageStateByTenantId(tenantId);
resourceService.deleteResourcesByTenantId(tenantId);
otaPackageService.deleteOtaPackagesByTenantId(tenantId);
rpcService.deleteAllRpcByTenantId(tenantId);
tenantDao.removeById(tenantId, tenantId.getId());
deleteEntityRelations(tenantId, tenantId);
}

View File

@ -567,3 +567,14 @@ CREATE TABLE IF NOT EXISTS edge_event (
tenant_id uuid,
ts bigint NOT NULL
);
CREATE TABLE IF NOT EXISTS rpc (
id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY,
created_time bigint NOT NULL,
tenant_id uuid NOT NULL,
device_id uuid NOT NULL,
expiration_time bigint NOT NULL,
request varchar(10000000) NOT NULL,
response varchar(10000000),
status varchar(255) NOT NULL
);

View File

@ -46,3 +46,4 @@ CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribu
CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time);
CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id);

View File

@ -605,6 +605,17 @@ CREATE TABLE IF NOT EXISTS edge_event (
ts bigint NOT NULL
);
CREATE TABLE IF NOT EXISTS rpc (
id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY,
created_time bigint NOT NULL,
tenant_id uuid NOT NULL,
device_id uuid NOT NULL,
expiration_time bigint NOT NULL,
request varchar(10000000) NOT NULL,
response varchar(10000000),
status varchar(255) NOT NULL
);
CREATE OR REPLACE PROCEDURE cleanup_events_by_ttl(IN ttl bigint, IN debug_ttl bigint, INOUT deleted bigint)
LANGUAGE plpgsql AS
$$

View File

@ -36,4 +36,5 @@ DROP TABLE IF EXISTS resource;
DROP TABLE IF EXISTS ota_package;
DROP TABLE IF EXISTS edge;
DROP TABLE IF EXISTS edge_event;
DROP TABLE IF EXISTS rpc;
DROP FUNCTION IF EXISTS to_uuid;

View File

@ -37,3 +37,4 @@ DROP TABLE IF EXISTS resource;
DROP TABLE IF EXISTS firmware;
DROP TABLE IF EXISTS edge;
DROP TABLE IF EXISTS edge_event;
DROP TABLE IF EXISTS rpc;

View File

@ -35,6 +35,7 @@ public final class RuleEngineDeviceRpcRequest {
private final UUID requestUUID;
private final String originServiceId;
private final boolean oneway;
private final boolean persisted;
private final String method;
private final String body;
private final long expirationTime;

View File

@ -33,8 +33,8 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
type = ComponentType.FILTER,
name = "message type switch",
configClazz = EmptyNodeConfiguration.class,
relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "Activity Event", "Inactivity Event",
"Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned",
relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Failed",
"Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned",
"Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant",
"Timeseries Updated", "Timeseries Deleted"},
nodeDescription = "Route incoming messages by Message Type",
@ -95,6 +95,16 @@ public class TbMsgTypeSwitchNode implements TbNode {
relationType = "Timeseries Updated";
} else if (msg.getType().equals(DataConstants.TIMESERIES_DELETED)) {
relationType = "Timeseries Deleted";
} else if (msg.getType().equals(DataConstants.RPC_QUEUED)) {
relationType = "RPC Queued";
} else if (msg.getType().equals(DataConstants.RPC_DELIVERED)) {
relationType = "RPC Delivered";
} else if (msg.getType().equals(DataConstants.RPC_SUCCESSFUL)) {
relationType = "RPC Successful";
} else if (msg.getType().equals(DataConstants.RPC_TIMEOUT)) {
relationType = "RPC Timeout";
} else if (msg.getType().equals(DataConstants.RPC_FAILED)) {
relationType = "RPC Failed";
} else {
relationType = "Other";
}

View File

@ -81,6 +81,9 @@ public class TbSendRPCRequestNode implements TbNode {
tmp = msg.getMetaData().getValue("oneway");
boolean oneway = !StringUtils.isEmpty(tmp) && Boolean.parseBoolean(tmp);
tmp = msg.getMetaData().getValue("persisted");
boolean persisted = !StringUtils.isEmpty(tmp) && Boolean.parseBoolean(tmp);
tmp = msg.getMetaData().getValue("requestUUID");
UUID requestUUID = !StringUtils.isEmpty(tmp) ? UUID.fromString(tmp) : Uuids.timeBased();
tmp = msg.getMetaData().getValue("originServiceId");
@ -108,6 +111,7 @@ public class TbSendRPCRequestNode implements TbNode {
.originServiceId(originServiceId)
.expirationTime(expirationTime)
.restApiCall(restApiCall)
.persisted(persisted)
.build();
ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> {

View File

@ -196,6 +196,18 @@
{{ 'tenant-profile.alarms-ttl-days-days-range' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.rpc-ttl-days</mat-label>
<input matInput required min="0" step="1"
formControlName="rpcTtlDays"
type="number">
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('rpcTtlDays').hasError('required')">
{{ 'tenant-profile.rpc-ttl-days-required' | translate}}
</mat-error>
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('rpcTtlDays').hasError('min')">
{{ 'tenant-profile.rpc-ttl-days-days-range' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field class="mat-block">
<mat-label translate>tenant-profile.max-rule-node-executions-per-message</mat-label>
<input matInput required min="0" step="1"

View File

@ -77,7 +77,8 @@ export class DefaultTenantProfileConfigurationComponent implements ControlValueA
maxSms: [null, [Validators.required, Validators.min(0)]],
maxCreatedAlarms: [null, [Validators.required, Validators.min(0)]],
defaultStorageTtlDays: [null, [Validators.required, Validators.min(0)]],
alarmsTtlDays: [null, [Validators.required, Validators.min(0)]]
alarmsTtlDays: [null, [Validators.required, Validators.min(0)]],
rpcTtlDays: [null, [Validators.required, Validators.min(0)]]
});
this.defaultTenantProfileConfigurationFormGroup.valueChanges.subscribe(() => {
this.updateModel();

View File

@ -352,7 +352,12 @@ export enum MessageType {
ATTRIBUTES_UPDATED = 'ATTRIBUTES_UPDATED',
ATTRIBUTES_DELETED = 'ATTRIBUTES_DELETED',
TIMESERIES_UPDATED = 'TIMESERIES_UPDATED',
TIMESERIES_DELETED = 'TIMESERIES_DELETED'
TIMESERIES_DELETED = 'TIMESERIES_DELETED',
RPC_QUEUED = 'RPC_QUEUED',
RPC_DELIVERED = 'RPC_DELIVERED',
RPC_SUCCESSFUL = 'RPC_SUCCESSFUL',
RPC_TIMEOUT = 'RPC_TIMEOUT',
RPC_FAILED = 'RPC_FAILED'
}
export const messageTypeNames = new Map<MessageType, string>(
@ -373,7 +378,12 @@ export const messageTypeNames = new Map<MessageType, string>(
[MessageType.ATTRIBUTES_UPDATED, 'Attributes Updated'],
[MessageType.ATTRIBUTES_DELETED, 'Attributes Deleted'],
[MessageType.TIMESERIES_UPDATED, 'Timeseries Updated'],
[MessageType.TIMESERIES_DELETED, 'Timeseries Deleted']
[MessageType.TIMESERIES_DELETED, 'Timeseries Deleted'],
[MessageType.RPC_QUEUED, 'RPC Queued'],
[MessageType.RPC_DELIVERED, 'RPC Delivered'],
[MessageType.RPC_SUCCESSFUL, 'RPC Successful'],
[MessageType.RPC_TIMEOUT, 'RPC Timeout'],
[MessageType.RPC_FAILED, 'RPC Failed']
]
);

View File

@ -53,6 +53,7 @@ export interface DefaultTenantProfileConfiguration {
defaultStorageTtlDays: number;
alarmsTtlDays: number;
rpcTtlDays: number;
}
export type TenantProfileConfigurations = DefaultTenantProfileConfiguration;
@ -85,7 +86,8 @@ export function createTenantProfileConfiguration(type: TenantProfileType): Tenan
maxSms: 0,
maxCreatedAlarms: 0,
defaultStorageTtlDays: 0,
alarmsTtlDays: 0
alarmsTtlDays: 0,
rpcTtlDays: 0
};
configuration = {...defaultConfiguration, type: TenantProfileType.DEFAULT};
break;

View File

@ -2638,6 +2638,9 @@
"alarms-ttl-days": "Alarms TTL days (0 - unlimited)",
"alarms-ttl-days-required": "Alarms TTL days required",
"alarms-ttl-days-days-range": "Alarms TTL days can't be negative",
"rpc-ttl-days": "RPC TTL days (0 - unlimited)",
"rpc-ttl-days-required": "RPC TTL days required",
"rpc-ttl-days-days-range": "RPC TTL days can't be negative",
"max-rule-node-executions-per-message": "Maximum number of rule node executions per message (0 - unlimited)",
"max-rule-node-executions-per-message-required": "Maximum number of rule node executions per message is required.",
"max-rule-node-executions-per-message-range": "Maximum number of rule node executions per message can't be negative",