Merge branch 'master' into Coap_oneEndPoint
This commit is contained in:
commit
f0ec3616e7
@ -197,3 +197,17 @@ $$;
|
||||
ALTER TABLE api_usage_state
|
||||
ADD COLUMN IF NOT EXISTS alarm_exec VARCHAR(32);
|
||||
UPDATE api_usage_state SET alarm_exec = 'ENABLED' WHERE alarm_exec IS NULL;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS rpc (
|
||||
id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY,
|
||||
created_time bigint NOT NULL,
|
||||
tenant_id uuid NOT NULL,
|
||||
device_id uuid NOT NULL,
|
||||
expiration_time bigint NOT NULL,
|
||||
request varchar(10000000) NOT NULL,
|
||||
response varchar(10000000),
|
||||
status varchar(255) NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id);
|
||||
|
||||
|
||||
@ -65,6 +65,7 @@ import org.thingsboard.server.dao.relation.RelationService;
|
||||
import org.thingsboard.server.dao.resource.ResourceService;
|
||||
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||
import org.thingsboard.server.dao.rule.RuleNodeStateService;
|
||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
import org.thingsboard.server.dao.tenant.TenantProfileService;
|
||||
import org.thingsboard.server.dao.tenant.TenantService;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
@ -80,9 +81,9 @@ import org.thingsboard.server.service.executors.ExternalCallExecutorService;
|
||||
import org.thingsboard.server.service.executors.SharedEventLoopGroupService;
|
||||
import org.thingsboard.server.service.mail.MailExecutorService;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
import org.thingsboard.server.service.queue.TbClusterService;
|
||||
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
|
||||
import org.thingsboard.server.service.rpc.TbRpcService;
|
||||
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
|
||||
import org.thingsboard.server.service.script.JsExecutorService;
|
||||
import org.thingsboard.server.service.script.JsInvokeService;
|
||||
@ -303,23 +304,33 @@ public class ActorSystemContext {
|
||||
|
||||
@Lazy
|
||||
@Autowired(required = false)
|
||||
@Getter private EdgeService edgeService;
|
||||
@Getter
|
||||
private EdgeService edgeService;
|
||||
|
||||
@Lazy
|
||||
@Autowired(required = false)
|
||||
@Getter private EdgeEventService edgeEventService;
|
||||
@Getter
|
||||
private EdgeEventService edgeEventService;
|
||||
|
||||
@Lazy
|
||||
@Autowired(required = false)
|
||||
@Getter private EdgeRpcService edgeRpcService;
|
||||
@Getter
|
||||
private EdgeRpcService edgeRpcService;
|
||||
|
||||
@Lazy
|
||||
@Autowired(required = false)
|
||||
@Getter private ResourceService resourceService;
|
||||
@Getter
|
||||
private ResourceService resourceService;
|
||||
|
||||
@Lazy
|
||||
@Autowired(required = false)
|
||||
@Getter private OtaPackageService otaPackageService;
|
||||
@Getter
|
||||
private OtaPackageService otaPackageService;
|
||||
|
||||
@Lazy
|
||||
@Autowired(required = false)
|
||||
@Getter
|
||||
private TbRpcService tbRpcService;
|
||||
|
||||
@Value("${actors.session.max_concurrent_sessions_per_device:1}")
|
||||
@Getter
|
||||
|
||||
@ -46,7 +46,7 @@ public class DeviceActor extends ContextAwareActor {
|
||||
super.init(ctx);
|
||||
log.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId);
|
||||
try {
|
||||
processor.initSessionTimeout(ctx);
|
||||
processor.init(ctx);
|
||||
log.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId);
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.deviceId, e);
|
||||
|
||||
@ -23,6 +23,7 @@ import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.rule.engine.api.RpcError;
|
||||
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
|
||||
import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg;
|
||||
@ -38,12 +39,17 @@ import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.RpcId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKey;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
||||
import org.thingsboard.server.common.data.rpc.Rpc;
|
||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
|
||||
import org.thingsboard.server.common.data.security.DeviceCredentials;
|
||||
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
|
||||
@ -52,8 +58,8 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
|
||||
import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
|
||||
@ -68,10 +74,12 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToDevicePersistedRpcResponseMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
|
||||
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
|
||||
@ -162,20 +170,19 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
|
||||
void processRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg) {
|
||||
ToDeviceRpcRequest request = msg.getMsg();
|
||||
ToDeviceRpcRequestBody body = request.getBody();
|
||||
ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder()
|
||||
.setRequestId(rpcSeq++)
|
||||
.setMethodName(body.getMethod())
|
||||
.setParams(body.getParams())
|
||||
.setExpirationTime(request.getExpirationTime())
|
||||
.setRequestIdMSB(request.getId().getMostSignificantBits())
|
||||
.setRequestIdLSB(request.getId().getLeastSignificantBits())
|
||||
.build();
|
||||
ToDeviceRpcRequestMsg rpcRequest = creteToDeviceRpcRequestMsg(request);
|
||||
|
||||
long timeout = request.getExpirationTime() - System.currentTimeMillis();
|
||||
boolean persisted = request.isPersisted();
|
||||
|
||||
if (timeout <= 0) {
|
||||
log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime());
|
||||
if (persisted) {
|
||||
createRpc(request, RpcStatus.TIMEOUT);
|
||||
}
|
||||
return;
|
||||
} else if (persisted) {
|
||||
createRpc(request, RpcStatus.QUEUED);
|
||||
}
|
||||
|
||||
boolean sent;
|
||||
@ -192,10 +199,16 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
syncSessionSet.add(key);
|
||||
}
|
||||
});
|
||||
log.trace("46) Rpc syncSessionSet [{}] subscription after sent [{}]",syncSessionSet, rpcSubscriptions);
|
||||
log.trace("46) Rpc syncSessionSet [{}] subscription after sent [{}]", syncSessionSet, rpcSubscriptions);
|
||||
syncSessionSet.forEach(rpcSubscriptions::remove);
|
||||
}
|
||||
|
||||
if (persisted && !(sent || request.isOneway())) {
|
||||
ObjectNode response = JacksonUtil.newObjectNode();
|
||||
response.put("rpcId", request.getId().toString());
|
||||
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), JacksonUtil.toString(response), null));
|
||||
}
|
||||
|
||||
if (request.isOneway() && sent) {
|
||||
log.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
|
||||
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null));
|
||||
@ -209,6 +222,32 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) {
|
||||
Rpc rpc = new Rpc(new RpcId(request.getId()));
|
||||
rpc.setCreatedTime(System.currentTimeMillis());
|
||||
rpc.setTenantId(tenantId);
|
||||
rpc.setDeviceId(deviceId);
|
||||
rpc.setExpirationTime(request.getExpirationTime());
|
||||
rpc.setRequest(JacksonUtil.valueToTree(request));
|
||||
rpc.setStatus(status);
|
||||
systemContext.getTbRpcService().save(tenantId, rpc);
|
||||
return systemContext.getTbRpcService().save(tenantId, rpc);
|
||||
}
|
||||
|
||||
private ToDeviceRpcRequestMsg creteToDeviceRpcRequestMsg(ToDeviceRpcRequest request) {
|
||||
ToDeviceRpcRequestBody body = request.getBody();
|
||||
return ToDeviceRpcRequestMsg.newBuilder()
|
||||
.setRequestId(rpcSeq++)
|
||||
.setMethodName(body.getMethod())
|
||||
.setParams(body.getParams())
|
||||
.setExpirationTime(request.getExpirationTime())
|
||||
.setRequestIdMSB(request.getId().getMostSignificantBits())
|
||||
.setRequestIdLSB(request.getId().getLeastSignificantBits())
|
||||
.setOneway(request.isOneway())
|
||||
.setPersisted(request.isPersisted())
|
||||
.build();
|
||||
}
|
||||
|
||||
void processRpcResponsesFromEdge(TbActorCtx context, FromDeviceRpcResponseActorMsg responseMsg) {
|
||||
log.debug("[{}] Processing rpc command response from edge session", deviceId);
|
||||
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
|
||||
@ -230,6 +269,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
|
||||
if (requestMd != null) {
|
||||
log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
|
||||
systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.TIMEOUT, null);
|
||||
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
|
||||
null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
|
||||
}
|
||||
@ -271,7 +311,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
.setExpirationTime(request.getExpirationTime())
|
||||
.setRequestIdMSB(request.getId().getMostSignificantBits())
|
||||
.setRequestIdLSB(request.getId().getLeastSignificantBits())
|
||||
.setOneway(request.isOneway())
|
||||
.setPersisted(request.isPersisted())
|
||||
.build();
|
||||
|
||||
sendToTransport(rpcRequest, sessionId, nodeId);
|
||||
};
|
||||
}
|
||||
@ -279,31 +322,39 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
void process(TbActorCtx context, TransportToDeviceActorMsgWrapper wrapper) {
|
||||
TransportToDeviceActorMsg msg = wrapper.getMsg();
|
||||
TbCallback callback = wrapper.getCallback();
|
||||
var sessionInfo = msg.getSessionInfo();
|
||||
|
||||
if (msg.hasSessionEvent()) {
|
||||
processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent());
|
||||
processSessionStateMsgs(sessionInfo, msg.getSessionEvent());
|
||||
}
|
||||
if (msg.hasSubscribeToAttributes()) {
|
||||
processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToAttributes());
|
||||
processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToAttributes());
|
||||
}
|
||||
if (msg.hasSubscribeToRPC()) {
|
||||
processSubscriptionCommands(context, msg.getSessionInfo(), msg.getSubscribeToRPC());
|
||||
processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToRPC());
|
||||
}
|
||||
if (msg.hasSendPendingRPC()) {
|
||||
sendPendingRequests(context, getSessionId(sessionInfo), sessionInfo);
|
||||
}
|
||||
if (msg.hasGetAttributes()) {
|
||||
handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes());
|
||||
handleGetAttributesRequest(context, sessionInfo, msg.getGetAttributes());
|
||||
}
|
||||
if (msg.hasToDeviceRPCCallResponse()) {
|
||||
processRpcResponses(context, msg.getSessionInfo(), msg.getToDeviceRPCCallResponse());
|
||||
processRpcResponses(context, sessionInfo, msg.getToDeviceRPCCallResponse());
|
||||
}
|
||||
if (msg.hasSubscriptionInfo()) {
|
||||
handleSessionActivity(context, msg.getSessionInfo(), msg.getSubscriptionInfo());
|
||||
handleSessionActivity(context, sessionInfo, msg.getSubscriptionInfo());
|
||||
}
|
||||
if (msg.hasClaimDevice()) {
|
||||
handleClaimDeviceMsg(context, msg.getSessionInfo(), msg.getClaimDevice());
|
||||
handleClaimDeviceMsg(context, sessionInfo, msg.getClaimDevice());
|
||||
}
|
||||
if (msg.hasPersistedRpcResponseMsg()) {
|
||||
processPersistedRpcResponses(context, sessionInfo, msg.getPersistedRpcResponseMsg());
|
||||
}
|
||||
callback.onSuccess();
|
||||
}
|
||||
|
||||
private void handleClaimDeviceMsg(TbActorCtx context, SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg) {
|
||||
private void handleClaimDeviceMsg(TbActorCtx context, SessionInfoProto sessionInfo, ClaimDeviceMsg msg) {
|
||||
DeviceId deviceId = new DeviceId(new UUID(msg.getDeviceIdMSB(), msg.getDeviceIdLSB()));
|
||||
systemContext.getClaimDevicesService().registerClaimingInfo(tenantId, deviceId, msg.getSecretKey(), msg.getDurationMs());
|
||||
}
|
||||
@ -442,11 +493,22 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
if (success) {
|
||||
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
|
||||
responseMsg.getPayload(), null));
|
||||
if (requestMd.getMsg().getMsg().isPersisted()) {
|
||||
systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.SUCCESSFUL, JacksonUtil.toJsonNode(responseMsg.getPayload()));
|
||||
}
|
||||
} else {
|
||||
log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
|
||||
if (requestMd.getMsg().getMsg().isPersisted()) {
|
||||
systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.FAILED, JacksonUtil.toJsonNode(responseMsg.getPayload()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void processPersistedRpcResponses(TbActorCtx context, SessionInfoProto sessionInfo, ToDevicePersistedRpcResponseMsg responseMsg) {
|
||||
UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB());
|
||||
systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), RpcStatus.valueOf(responseMsg.getStatus()), null);
|
||||
}
|
||||
|
||||
private void processSubscriptionCommands(TbActorCtx context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {
|
||||
UUID sessionId = getSessionId(sessionInfo);
|
||||
if (subscribeCmd.getUnsubscribe()) {
|
||||
@ -565,7 +627,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
|
||||
void notifyTransportAboutProfileUpdate(UUID sessionId, SessionInfoMetaData sessionMd, DeviceCredentials deviceCredentials) {
|
||||
log.info("2) LwM2Mtype: ");
|
||||
TransportProtos.ToTransportUpdateCredentialsProto.Builder notification = TransportProtos.ToTransportUpdateCredentialsProto.newBuilder();
|
||||
ToTransportUpdateCredentialsProto.Builder notification = ToTransportUpdateCredentialsProto.newBuilder();
|
||||
notification.addCredentialsId(deviceCredentials.getCredentialsId());
|
||||
notification.addCredentialsValue(deviceCredentials.getCredentialsValue());
|
||||
ToTransportMsg msg = ToTransportMsg.newBuilder()
|
||||
@ -640,7 +702,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
ListenableFuture<EdgeEvent> future = systemContext.getEdgeEventService().saveAsync(edgeEvent);
|
||||
Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
|
||||
@Override
|
||||
public void onSuccess( EdgeEvent result) {
|
||||
public void onSuccess(EdgeEvent result) {
|
||||
systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId);
|
||||
}
|
||||
|
||||
@ -756,8 +818,26 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
.addAllSessions(sessionsList).build().toByteArray());
|
||||
}
|
||||
|
||||
void initSessionTimeout(TbActorCtx ctx) {
|
||||
void init(TbActorCtx ctx) {
|
||||
schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(), systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout());
|
||||
PageLink pageLink = new PageLink(1024);
|
||||
PageData<Rpc> pageData;
|
||||
do {
|
||||
pageData = systemContext.getTbRpcService().findAllByDeviceIdAndStatus(tenantId, deviceId, RpcStatus.QUEUED, pageLink);
|
||||
pageData.getData().forEach(rpc -> {
|
||||
ToDeviceRpcRequest msg = JacksonUtil.convertValue(rpc.getRequest(), ToDeviceRpcRequest.class);
|
||||
long timeout = rpc.getExpirationTime() - System.currentTimeMillis();
|
||||
if (timeout <= 0) {
|
||||
rpc.setStatus(RpcStatus.TIMEOUT);
|
||||
systemContext.getTbRpcService().save(tenantId, rpc);
|
||||
} else {
|
||||
registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, creteToDeviceRpcRequestMsg(msg), timeout);
|
||||
}
|
||||
});
|
||||
if (pageData.hasNext()) {
|
||||
pageLink = pageLink.nextPageLink();
|
||||
}
|
||||
} while (pageData.hasNext());
|
||||
}
|
||||
|
||||
void checkSessionsTimeout() {
|
||||
|
||||
@ -69,6 +69,7 @@ import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||
import org.thingsboard.server.common.data.id.EntityViewId;
|
||||
import org.thingsboard.server.common.data.id.OtaPackageId;
|
||||
import org.thingsboard.server.common.data.id.RpcId;
|
||||
import org.thingsboard.server.common.data.id.TbResourceId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.RuleNodeId;
|
||||
@ -83,6 +84,7 @@ import org.thingsboard.server.common.data.page.TimePageLink;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentDescriptor;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.rpc.Rpc;
|
||||
import org.thingsboard.server.common.data.rule.RuleChain;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainType;
|
||||
import org.thingsboard.server.common.data.rule.RuleNode;
|
||||
@ -106,6 +108,7 @@ import org.thingsboard.server.dao.model.ModelConstants;
|
||||
import org.thingsboard.server.dao.oauth2.OAuth2ConfigTemplateService;
|
||||
import org.thingsboard.server.dao.oauth2.OAuth2Service;
|
||||
import org.thingsboard.server.dao.relation.RelationService;
|
||||
import org.thingsboard.server.dao.rpc.RpcService;
|
||||
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
import org.thingsboard.server.dao.tenant.TenantProfileService;
|
||||
@ -245,6 +248,9 @@ public abstract class BaseController {
|
||||
@Autowired
|
||||
protected OtaPackageStateService otaPackageStateService;
|
||||
|
||||
@Autowired
|
||||
protected RpcService rpcService;
|
||||
|
||||
@Autowired
|
||||
protected TbQueueProducerProvider producerProvider;
|
||||
|
||||
@ -786,6 +792,18 @@ public abstract class BaseController {
|
||||
}
|
||||
}
|
||||
|
||||
Rpc checkRpcId(RpcId rpcId, Operation operation) throws ThingsboardException {
|
||||
try {
|
||||
validateId(rpcId, "Incorrect rpcId " + rpcId);
|
||||
Rpc rpc = rpcService.findById(getCurrentUser().getTenantId(), rpcId);
|
||||
checkNotNull(rpc);
|
||||
accessControlService.checkPermission(getCurrentUser(), Resource.RPC, operation, rpcId, rpc);
|
||||
return rpc;
|
||||
} catch (Exception e) {
|
||||
throw handleException(e, false);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <I extends EntityId> I emptyId(EntityType entityType) {
|
||||
return (I) EntityIdFactory.getByTypeAndUuid(entityType, ModelConstants.NULL_UUID);
|
||||
|
||||
@ -29,6 +29,7 @@ import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMethod;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.ResponseBody;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.context.request.async.DeferredResult;
|
||||
@ -38,8 +39,13 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.RpcId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.id.UUIDBased;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.rpc.Rpc;
|
||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
|
||||
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
@ -93,6 +99,52 @@ public class RpcController extends BaseController {
|
||||
return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody);
|
||||
}
|
||||
|
||||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
|
||||
@RequestMapping(value = "/persisted/{rpcId}", method = RequestMethod.GET)
|
||||
@ResponseBody
|
||||
public Rpc getPersistedRpc(@PathVariable("rpcId") String strRpc) throws ThingsboardException {
|
||||
checkParameter("RpcId", strRpc);
|
||||
try {
|
||||
RpcId rpcId = new RpcId(UUID.fromString(strRpc));
|
||||
return checkRpcId(rpcId, Operation.READ);
|
||||
} catch (Exception e) {
|
||||
throw handleException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
|
||||
@RequestMapping(value = "/persisted/{deviceId}", method = RequestMethod.GET)
|
||||
@ResponseBody
|
||||
public PageData<Rpc> getPersistedRpcByDevice(@PathVariable("deviceId") String strDeviceId,
|
||||
@RequestParam int pageSize,
|
||||
@RequestParam int page,
|
||||
@RequestParam RpcStatus rpcStatus,
|
||||
@RequestParam(required = false) String textSearch,
|
||||
@RequestParam(required = false) String sortProperty,
|
||||
@RequestParam(required = false) String sortOrder) throws ThingsboardException {
|
||||
checkParameter("DeviceId", strDeviceId);
|
||||
try {
|
||||
TenantId tenantId = getCurrentUser().getTenantId();
|
||||
PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder);
|
||||
DeviceId deviceId = new DeviceId(UUID.fromString(strDeviceId));
|
||||
return checkNotNull(rpcService.findAllByDeviceIdAndStatus(tenantId, deviceId, rpcStatus, pageLink));
|
||||
} catch (Exception e) {
|
||||
throw handleException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
|
||||
@RequestMapping(value = "/persisted/{rpcId}", method = RequestMethod.DELETE)
|
||||
@ResponseBody
|
||||
public void deleteResource(@PathVariable("rpcId") String strRpc) throws ThingsboardException {
|
||||
checkParameter("RpcId", strRpc);
|
||||
try {
|
||||
rpcService.deleteRpc(getTenantId(), new RpcId(UUID.fromString(strRpc)));
|
||||
} catch (Exception e) {
|
||||
throw handleException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private DeferredResult<ResponseEntity> handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody) throws ThingsboardException {
|
||||
try {
|
||||
JsonNode rpcRequestBody = jsonMapper.readTree(requestBody);
|
||||
@ -103,6 +155,7 @@ public class RpcController extends BaseController {
|
||||
long timeout = rpcRequestBody.has("timeout") ? rpcRequestBody.get("timeout").asLong() : defaultTimeout;
|
||||
long expTime = System.currentTimeMillis() + Math.max(minTimeout, timeout);
|
||||
UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID();
|
||||
boolean persisted = rpcRequestBody.has("persisted") && rpcRequestBody.get("persisted").asBoolean();
|
||||
accessValidator.validate(currentUser, Operation.RPC_CALL, deviceId, new HttpValidationCallback(response, new FutureCallback<DeferredResult<ResponseEntity>>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
|
||||
@ -111,7 +164,8 @@ public class RpcController extends BaseController {
|
||||
deviceId,
|
||||
oneWay,
|
||||
expTime,
|
||||
body
|
||||
body,
|
||||
persisted
|
||||
);
|
||||
deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse), currentUser);
|
||||
}
|
||||
|
||||
@ -157,6 +157,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
|
||||
metaData.putValue("originServiceId", serviceId);
|
||||
metaData.putValue("expirationTime", Long.toString(msg.getExpirationTime()));
|
||||
metaData.putValue("oneway", Boolean.toString(msg.isOneway()));
|
||||
metaData.putValue("persisted", Boolean.toString(msg.isPersisted()));
|
||||
|
||||
Device device = deviceService.findDeviceById(msg.getTenantId(), msg.getDeviceId());
|
||||
if (device != null) {
|
||||
|
||||
@ -100,7 +100,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
|
||||
@Override
|
||||
public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
|
||||
ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(),
|
||||
src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()));
|
||||
src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted());
|
||||
forwardRpcRequestToDeviceActor(request, response -> {
|
||||
if (src.isRestApiCall()) {
|
||||
sendRpcResponseToTbCore(src.getOriginServiceId(), response);
|
||||
|
||||
@ -0,0 +1,77 @@
|
||||
/**
|
||||
* Copyright © 2016-2021 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.rpc;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.RpcId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.rpc.Rpc;
|
||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
import org.thingsboard.server.dao.rpc.RpcService;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.queue.TbClusterService;
|
||||
|
||||
@TbCoreComponent
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class TbRpcService {
|
||||
private final RpcService rpcService;
|
||||
private final TbClusterService tbClusterService;
|
||||
|
||||
public Rpc save(TenantId tenantId, Rpc rpc) {
|
||||
Rpc saved = rpcService.save(rpc);
|
||||
pushRpcMsgToRuleEngine(tenantId, saved);
|
||||
return saved;
|
||||
}
|
||||
|
||||
public void save(TenantId tenantId, RpcId rpcId, RpcStatus newStatus, JsonNode response) {
|
||||
Rpc foundRpc = rpcService.findById(tenantId, rpcId);
|
||||
if (foundRpc != null) {
|
||||
foundRpc.setStatus(newStatus);
|
||||
if (response != null) {
|
||||
foundRpc.setResponse(response);
|
||||
}
|
||||
Rpc saved = rpcService.save(foundRpc);
|
||||
pushRpcMsgToRuleEngine(tenantId, saved);
|
||||
} else {
|
||||
log.warn("[{}] Failed to update RPC status because RPC was already deleted", rpcId);
|
||||
}
|
||||
}
|
||||
|
||||
private void pushRpcMsgToRuleEngine(TenantId tenantId, Rpc rpc) {
|
||||
TbMsg msg = TbMsg.newMsg("RPC_" + rpc.getStatus().name(), rpc.getDeviceId(), TbMsgMetaData.EMPTY, JacksonUtil.toString(rpc));
|
||||
tbClusterService.pushMsgToRuleEngine(tenantId, rpc.getId(), msg, null);
|
||||
}
|
||||
|
||||
public Rpc findRpcById(TenantId tenantId, RpcId rpcId) {
|
||||
return rpcService.findById(tenantId, rpcId);
|
||||
}
|
||||
|
||||
public PageData<Rpc> findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) {
|
||||
return rpcService.findAllByDeviceIdAndStatus(tenantId, deviceId, rpcStatus, pageLink);
|
||||
}
|
||||
|
||||
}
|
||||
@ -47,11 +47,13 @@ import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||
import org.thingsboard.server.common.data.id.EntityViewId;
|
||||
import org.thingsboard.server.common.data.id.OtaPackageId;
|
||||
import org.thingsboard.server.common.data.id.RpcId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.RuleNodeId;
|
||||
import org.thingsboard.server.common.data.id.TbResourceId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.id.UserId;
|
||||
import org.thingsboard.server.common.data.rpc.Rpc;
|
||||
import org.thingsboard.server.common.data.rule.RuleChain;
|
||||
import org.thingsboard.server.common.data.rule.RuleNode;
|
||||
import org.thingsboard.server.controller.HttpValidationCallback;
|
||||
@ -65,6 +67,7 @@ import org.thingsboard.server.dao.entityview.EntityViewService;
|
||||
import org.thingsboard.server.dao.exception.IncorrectParameterException;
|
||||
import org.thingsboard.server.dao.ota.OtaPackageService;
|
||||
import org.thingsboard.server.dao.resource.ResourceService;
|
||||
import org.thingsboard.server.dao.rpc.RpcService;
|
||||
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||
import org.thingsboard.server.dao.tenant.TenantService;
|
||||
import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
|
||||
@ -137,6 +140,9 @@ public class AccessValidator {
|
||||
@Autowired
|
||||
protected OtaPackageService otaPackageService;
|
||||
|
||||
@Autowired
|
||||
protected RpcService rpcService;
|
||||
|
||||
private ExecutorService executor;
|
||||
|
||||
@PostConstruct
|
||||
@ -235,6 +241,9 @@ public class AccessValidator {
|
||||
case OTA_PACKAGE:
|
||||
validateOtaPackage(currentUser, operation, entityId, callback);
|
||||
return;
|
||||
case RPC:
|
||||
validateRpc(currentUser, operation, entityId, callback);
|
||||
return;
|
||||
default:
|
||||
//TODO: add support of other entities
|
||||
throw new IllegalStateException("Not Implemented!");
|
||||
@ -261,6 +270,22 @@ public class AccessValidator {
|
||||
}
|
||||
}
|
||||
|
||||
private void validateRpc(final SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback<ValidationResult> callback) {
|
||||
ListenableFuture<Rpc> rpcFurure = rpcService.findRpcByIdAsync(currentUser.getTenantId(), new RpcId(entityId.getId()));
|
||||
Futures.addCallback(rpcFurure, getCallback(callback, rpc -> {
|
||||
if (rpc == null) {
|
||||
return ValidationResult.entityNotFound("Rpc with requested id wasn't found!");
|
||||
} else {
|
||||
try {
|
||||
accessControlService.checkPermission(currentUser, Resource.RPC, operation, entityId, rpc);
|
||||
} catch (ThingsboardException e) {
|
||||
return ValidationResult.accessDenied(e.getMessage());
|
||||
}
|
||||
return ValidationResult.ok(rpc);
|
||||
}
|
||||
}), executor);
|
||||
}
|
||||
|
||||
private void validateDeviceProfile(final SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback<ValidationResult> callback) {
|
||||
if (currentUser.isSystemAdmin()) {
|
||||
callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
|
||||
|
||||
@ -41,6 +41,7 @@ public class CustomerUserPermissions extends AbstractPermissions {
|
||||
put(Resource.WIDGETS_BUNDLE, widgetsPermissionChecker);
|
||||
put(Resource.WIDGET_TYPE, widgetsPermissionChecker);
|
||||
put(Resource.EDGE, customerEntityPermissionChecker);
|
||||
put(Resource.RPC, rpcPermissionChecker);
|
||||
}
|
||||
|
||||
private static final PermissionChecker customerEntityPermissionChecker =
|
||||
@ -138,4 +139,22 @@ public class CustomerUserPermissions extends AbstractPermissions {
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
private static final PermissionChecker rpcPermissionChecker = new PermissionChecker.GenericPermissionChecker(Operation.READ) {
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public boolean hasPermission(SecurityUser user, Operation operation, EntityId entityId, HasTenantId entity) {
|
||||
if (!super.hasPermission(user, operation, entityId, entity)) {
|
||||
return false;
|
||||
}
|
||||
if (entity.getTenantId() == null || entity.getTenantId().isNullUid()) {
|
||||
return true;
|
||||
}
|
||||
if (!user.getTenantId().equals(entity.getTenantId())) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@ -39,7 +39,8 @@ public enum Resource {
|
||||
API_USAGE_STATE(EntityType.API_USAGE_STATE),
|
||||
TB_RESOURCE(EntityType.TB_RESOURCE),
|
||||
OTA_PACKAGE(EntityType.OTA_PACKAGE),
|
||||
EDGE(EntityType.EDGE);
|
||||
EDGE(EntityType.EDGE),
|
||||
RPC(EntityType.RPC);
|
||||
|
||||
private final EntityType entityType;
|
||||
|
||||
|
||||
@ -44,6 +44,7 @@ public class TenantAdminPermissions extends AbstractPermissions {
|
||||
put(Resource.TB_RESOURCE, tbResourcePermissionChecker);
|
||||
put(Resource.OTA_PACKAGE, tenantEntityPermissionChecker);
|
||||
put(Resource.EDGE, tenantEntityPermissionChecker);
|
||||
put(Resource.RPC, tenantEntityPermissionChecker);
|
||||
}
|
||||
|
||||
public static final PermissionChecker tenantEntityPermissionChecker = new PermissionChecker() {
|
||||
|
||||
@ -22,6 +22,7 @@ import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.protobuf.ByteString;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
@ -41,13 +42,13 @@ import org.thingsboard.server.common.data.TenantProfile;
|
||||
import org.thingsboard.server.common.data.device.credentials.BasicMqttCredentials;
|
||||
import org.thingsboard.server.common.data.device.credentials.ProvisionDeviceCredentialsData;
|
||||
import org.thingsboard.server.common.data.device.profile.ProvisionDeviceProfileCredentials;
|
||||
import org.thingsboard.server.common.data.ota.OtaPackageType;
|
||||
import org.thingsboard.server.common.data.ota.OtaPackageUtil;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.OtaPackageId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.ota.OtaPackageType;
|
||||
import org.thingsboard.server.common.data.ota.OtaPackageUtil;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
@ -108,6 +109,7 @@ import java.util.stream.Collectors;
|
||||
@Slf4j
|
||||
@Service
|
||||
@TbCoreComponent
|
||||
@RequiredArgsConstructor
|
||||
public class DefaultTransportApiService implements TransportApiService {
|
||||
|
||||
private static final ObjectMapper mapper = new ObjectMapper();
|
||||
@ -129,28 +131,6 @@ public class DefaultTransportApiService implements TransportApiService {
|
||||
|
||||
private final ConcurrentMap<String, ReentrantLock> deviceCreationLocks = new ConcurrentHashMap<>();
|
||||
|
||||
public DefaultTransportApiService(TbDeviceProfileCache deviceProfileCache,
|
||||
TbTenantProfileCache tenantProfileCache, TbApiUsageStateService apiUsageStateService, DeviceService deviceService,
|
||||
RelationService relationService, DeviceCredentialsService deviceCredentialsService,
|
||||
DeviceStateService deviceStateService, DbCallbackExecutorService dbCallbackExecutorService,
|
||||
TbClusterService tbClusterService, DataDecodingEncodingService dataDecodingEncodingService,
|
||||
DeviceProvisionService deviceProvisionService, TbResourceService resourceService, OtaPackageService otaPackageService, OtaPackageDataCache otaPackageDataCache) {
|
||||
this.deviceProfileCache = deviceProfileCache;
|
||||
this.tenantProfileCache = tenantProfileCache;
|
||||
this.apiUsageStateService = apiUsageStateService;
|
||||
this.deviceService = deviceService;
|
||||
this.relationService = relationService;
|
||||
this.deviceCredentialsService = deviceCredentialsService;
|
||||
this.deviceStateService = deviceStateService;
|
||||
this.dbCallbackExecutorService = dbCallbackExecutorService;
|
||||
this.tbClusterService = tbClusterService;
|
||||
this.dataDecodingEncodingService = dataDecodingEncodingService;
|
||||
this.deviceProvisionService = deviceProvisionService;
|
||||
this.resourceService = resourceService;
|
||||
this.otaPackageService = otaPackageService;
|
||||
this.otaPackageDataCache = otaPackageDataCache;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> handle(TbProtoQueueMsg<TransportApiRequestMsg> tbProtoQueueMsg) {
|
||||
TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue();
|
||||
|
||||
@ -0,0 +1,83 @@
|
||||
/**
|
||||
* Copyright © 2016-2021 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.ttl.rpc;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.dao.rpc.RpcDao;
|
||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
import org.thingsboard.server.dao.tenant.TenantDao;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@TbCoreComponent
|
||||
@Service
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class RpcCleanUpService {
|
||||
@Value("${sql.ttl.rpc.enabled}")
|
||||
private boolean ttlTaskExecutionEnabled;
|
||||
|
||||
private final TenantDao tenantDao;
|
||||
private final PartitionService partitionService;
|
||||
private final TbTenantProfileCache tenantProfileCache;
|
||||
private final RpcDao rpcDao;
|
||||
|
||||
@Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.rpc.checking_interval})}", fixedDelayString = "${sql.ttl.rpc.checking_interval}")
|
||||
public void cleanUp() {
|
||||
if (ttlTaskExecutionEnabled) {
|
||||
PageLink tenantsBatchRequest = new PageLink(10_000, 0);
|
||||
PageData<TenantId> tenantsIds;
|
||||
do {
|
||||
tenantsIds = tenantDao.findTenantsIds(tenantsBatchRequest);
|
||||
for (TenantId tenantId : tenantsIds.getData()) {
|
||||
if (!partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Optional<DefaultTenantProfileConfiguration> tenantProfileConfiguration = tenantProfileCache.get(tenantId).getProfileConfiguration();
|
||||
if (tenantProfileConfiguration.isEmpty() || tenantProfileConfiguration.get().getRpcTtlDays() == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
long ttl = TimeUnit.DAYS.toMillis(tenantProfileConfiguration.get().getRpcTtlDays());
|
||||
long expirationTime = System.currentTimeMillis() - ttl;
|
||||
|
||||
long totalRemoved = rpcDao.deleteOutdatedRpcByTenantId(tenantId, expirationTime);
|
||||
|
||||
if (totalRemoved > 0) {
|
||||
log.info("Removed {} outdated rpc(s) for tenant {} older than {}", totalRemoved, tenantId, new Date(expirationTime));
|
||||
}
|
||||
}
|
||||
|
||||
tenantsBatchRequest = tenantsBatchRequest.nextPageLink();
|
||||
} while (tenantsIds.hasNext());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -276,6 +276,9 @@ sql:
|
||||
alarms:
|
||||
checking_interval: "${SQL_ALARMS_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours
|
||||
removal_batch_size: "${SQL_ALARMS_TTL_REMOVAL_BATCH_SIZE:3000}" # To delete outdated alarms not all at once but in batches
|
||||
rpc:
|
||||
enabled: "${SQL_TTL_RPC_ENABLED:true}"
|
||||
checking_interval: "${SQL_RPC_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours
|
||||
|
||||
# Actor system parameters
|
||||
actors:
|
||||
|
||||
@ -75,13 +75,11 @@ public class X509LwM2MIntegrationTest extends AbstractLwM2MIntegrationTest {
|
||||
return device;
|
||||
}
|
||||
|
||||
//TODO: use different endpoints to isolate tests.
|
||||
@Ignore()
|
||||
@Test
|
||||
public void testConnectAndObserveTelemetry() throws Exception {
|
||||
createDeviceProfile(TRANSPORT_CONFIGURATION);
|
||||
X509ClientCredentials credentials = new X509ClientCredentials();
|
||||
credentials.setEndpoint(endpoint+1);
|
||||
credentials.setEndpoint(endpoint);
|
||||
Device device = createDevice(credentials);
|
||||
|
||||
SingleEntityFilter sef = new SingleEntityFilter();
|
||||
@ -99,7 +97,7 @@ public class X509LwM2MIntegrationTest extends AbstractLwM2MIntegrationTest {
|
||||
wsClient.waitForReply();
|
||||
|
||||
wsClient.registerWaitForUpdate();
|
||||
LwM2MTestClient client = new LwM2MTestClient(executor, endpoint+1);
|
||||
LwM2MTestClient client = new LwM2MTestClient(executor, endpoint);
|
||||
Security security = x509(serverUri, 123, clientX509Cert.getEncoded(), clientPrivateKeyFromCert.getEncoded(), serverX509Cert.getEncoded());
|
||||
client.init(security, coapConfig);
|
||||
String msg = wsClient.waitForUpdate();
|
||||
|
||||
@ -0,0 +1,39 @@
|
||||
/**
|
||||
* Copyright © 2016-2021 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.rpc;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.RpcId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.rpc.Rpc;
|
||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||
|
||||
public interface RpcService {
|
||||
Rpc save(Rpc rpc);
|
||||
|
||||
void deleteRpc(TenantId tenantId, RpcId id);
|
||||
|
||||
void deleteAllRpcByTenantId(TenantId tenantId);
|
||||
|
||||
Rpc findById(TenantId tenantId, RpcId id);
|
||||
|
||||
ListenableFuture<Rpc> findRpcByIdAsync(TenantId tenantId, RpcId id);
|
||||
|
||||
PageData<Rpc> findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink);
|
||||
}
|
||||
@ -76,6 +76,12 @@ public class DataConstants {
|
||||
|
||||
public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE";
|
||||
|
||||
public static final String RPC_QUEUED = "RPC_QUEUED";
|
||||
public static final String RPC_DELIVERED = "RPC_DELIVERED";
|
||||
public static final String RPC_SUCCESSFUL = "RPC_SUCCESSFUL";
|
||||
public static final String RPC_TIMEOUT = "RPC_TIMEOUT";
|
||||
public static final String RPC_FAILED = "RPC_FAILED";
|
||||
|
||||
public static final String DEFAULT_SECRET_KEY = "";
|
||||
public static final String SECRET_KEY_FIELD_NAME = "secretKey";
|
||||
public static final String DURATION_MS_FIELD_NAME = "durationMs";
|
||||
|
||||
@ -19,5 +19,5 @@ package org.thingsboard.server.common.data;
|
||||
* @author Andrew Shvayka
|
||||
*/
|
||||
public enum EntityType {
|
||||
TENANT, CUSTOMER, USER, DASHBOARD, ASSET, DEVICE, ALARM, RULE_CHAIN, RULE_NODE, ENTITY_VIEW, WIDGETS_BUNDLE, WIDGET_TYPE, TENANT_PROFILE, DEVICE_PROFILE, API_USAGE_STATE, TB_RESOURCE, OTA_PACKAGE, EDGE;
|
||||
TENANT, CUSTOMER, USER, DASHBOARD, ASSET, DEVICE, ALARM, RULE_CHAIN, RULE_NODE, ENTITY_VIEW, WIDGETS_BUNDLE, WIDGET_TYPE, TENANT_PROFILE, DEVICE_PROFILE, API_USAGE_STATE, TB_RESOURCE, OTA_PACKAGE, EDGE, RPC;
|
||||
}
|
||||
|
||||
@ -75,6 +75,8 @@ public class EntityIdFactory {
|
||||
return new OtaPackageId(uuid);
|
||||
case EDGE:
|
||||
return new EdgeId(uuid);
|
||||
case RPC:
|
||||
return new RpcId(uuid);
|
||||
}
|
||||
throw new IllegalArgumentException("EntityType " + type + " is not supported!");
|
||||
}
|
||||
|
||||
@ -0,0 +1,39 @@
|
||||
/**
|
||||
* Copyright © 2016-2021 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.data.id;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
public final class RpcId extends UUIDBased implements EntityId {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@JsonCreator
|
||||
public RpcId(@JsonProperty("id") UUID id) {
|
||||
super(id);
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
@Override
|
||||
public EntityType getEntityType() {
|
||||
return EntityType.RPC;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,54 @@
|
||||
/**
|
||||
* Copyright © 2016-2021 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.data.rpc;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import org.thingsboard.server.common.data.BaseData;
|
||||
import org.thingsboard.server.common.data.HasTenantId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.RpcId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
|
||||
@Data
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public class Rpc extends BaseData<RpcId> implements HasTenantId {
|
||||
private TenantId tenantId;
|
||||
private DeviceId deviceId;
|
||||
private long expirationTime;
|
||||
private JsonNode request;
|
||||
private JsonNode response;
|
||||
private RpcStatus status;
|
||||
|
||||
public Rpc() {
|
||||
super();
|
||||
}
|
||||
|
||||
public Rpc(RpcId id) {
|
||||
super(id);
|
||||
}
|
||||
|
||||
public Rpc(Rpc rpc) {
|
||||
super(rpc);
|
||||
this.tenantId = rpc.getTenantId();
|
||||
this.deviceId = rpc.getDeviceId();
|
||||
this.expirationTime = rpc.getExpirationTime();
|
||||
this.request = rpc.getRequest();
|
||||
this.response = rpc.getResponse();
|
||||
this.status = rpc.getStatus();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,20 @@
|
||||
/**
|
||||
* Copyright © 2016-2021 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.data.rpc;
|
||||
|
||||
public enum RpcStatus {
|
||||
QUEUED, DELIVERED, SUCCESSFUL, TIMEOUT, FAILED
|
||||
}
|
||||
@ -56,6 +56,7 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
|
||||
|
||||
private int defaultStorageTtlDays;
|
||||
private int alarmsTtlDays;
|
||||
private int rpcTtlDays;
|
||||
|
||||
private double warnThreshold;
|
||||
|
||||
|
||||
@ -34,5 +34,6 @@ public class ToDeviceRpcRequest implements Serializable {
|
||||
private final boolean oneway;
|
||||
private final long expirationTime;
|
||||
private final ToDeviceRpcRequestBody body;
|
||||
private final boolean persisted;
|
||||
}
|
||||
|
||||
|
||||
@ -318,6 +318,9 @@ message SubscribeToRPCMsg {
|
||||
SessionType sessionType = 2;
|
||||
}
|
||||
|
||||
message SendPendingRPCMsg {
|
||||
}
|
||||
|
||||
message ToDeviceRpcRequestMsg {
|
||||
int32 requestId = 1;
|
||||
string methodName = 2;
|
||||
@ -325,6 +328,8 @@ message ToDeviceRpcRequestMsg {
|
||||
int64 expirationTime = 4;
|
||||
int64 requestIdMSB = 5;
|
||||
int64 requestIdLSB = 6;
|
||||
bool oneway = 7;
|
||||
bool persisted = 8;
|
||||
}
|
||||
|
||||
message ToDeviceRpcResponseMsg {
|
||||
@ -332,6 +337,13 @@ message ToDeviceRpcResponseMsg {
|
||||
string payload = 2;
|
||||
}
|
||||
|
||||
message ToDevicePersistedRpcResponseMsg {
|
||||
int32 requestId = 1;
|
||||
int64 requestIdMSB = 2;
|
||||
int64 requestIdLSB = 3;
|
||||
string status = 4;
|
||||
}
|
||||
|
||||
message ToServerRpcRequestMsg {
|
||||
int32 requestId = 1;
|
||||
string methodName = 2;
|
||||
@ -435,6 +447,8 @@ message TransportToDeviceActorMsg {
|
||||
SubscriptionInfoProto subscriptionInfo = 7;
|
||||
ClaimDeviceMsg claimDevice = 8;
|
||||
ProvisionDeviceRequestMsg provisionDevice = 9;
|
||||
ToDevicePersistedRpcResponseMsg persistedRpcResponseMsg = 10;
|
||||
SendPendingRPCMsg sendPendingRPC = 11;
|
||||
}
|
||||
|
||||
message TransportToRuleEngineMsg {
|
||||
|
||||
@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportC
|
||||
import org.thingsboard.server.common.data.device.profile.JsonTransportPayloadConfiguration;
|
||||
import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
|
||||
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
|
||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||
import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
|
||||
import org.thingsboard.server.common.msg.session.FeatureType;
|
||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
|
||||
@ -332,14 +333,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
|
||||
break;
|
||||
case TO_SERVER_RPC_REQUEST:
|
||||
transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor,
|
||||
transportConfigurationContainer.getRpcRequestDynamicMessageBuilder()), timeout);
|
||||
transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), sessionInfo), timeout);
|
||||
transportService.process(sessionInfo,
|
||||
coapTransportAdaptor.convertToServerRpcRequest(sessionId, request),
|
||||
new CoapNoOpCallback(exchange));
|
||||
break;
|
||||
case GET_ATTRIBUTES_REQUEST:
|
||||
transportService.registerSyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor,
|
||||
transportConfigurationContainer.getRpcRequestDynamicMessageBuilder()), timeout);
|
||||
transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), sessionInfo), timeout);
|
||||
transportService.process(sessionInfo,
|
||||
coapTransportAdaptor.convertToGetAttributes(sessionId, request),
|
||||
new CoapNoOpCallback(exchange));
|
||||
@ -362,12 +363,12 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
|
||||
|
||||
private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) {
|
||||
tokenToSessionInfoMap.putIfAbsent(token, sessionInfo);
|
||||
transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder));
|
||||
transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo));
|
||||
transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
|
||||
}
|
||||
|
||||
private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) {
|
||||
return new CoapSessionListener(this, exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder);
|
||||
private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) {
|
||||
return new CoapSessionListener(this, exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder, sessionInfo);
|
||||
}
|
||||
|
||||
private String getTokenFromRequest(Request request) {
|
||||
@ -455,12 +456,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
|
||||
private final CoapExchange exchange;
|
||||
private final CoapTransportAdaptor coapTransportAdaptor;
|
||||
private final DynamicMessage.Builder rpcRequestDynamicMessageBuilder;
|
||||
private final TransportProtos.SessionInfoProto sessionInfo;
|
||||
|
||||
CoapSessionListener(CoapTransportResource coapTransportResource, CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) {
|
||||
CoapSessionListener(CoapTransportResource coapTransportResource, CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, TransportProtos.SessionInfoProto sessionInfo) {
|
||||
this.coapTransportResource = coapTransportResource;
|
||||
this.exchange = exchange;
|
||||
this.coapTransportAdaptor = coapTransportAdaptor;
|
||||
this.rpcRequestDynamicMessageBuilder = rpcRequestDynamicMessageBuilder;
|
||||
this.sessionInfo = sessionInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -503,11 +506,31 @@ public class CoapTransportResource extends AbstractCoapTransportResource {
|
||||
|
||||
@Override
|
||||
public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg msg) {
|
||||
boolean successful;
|
||||
try {
|
||||
exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg, rpcRequestDynamicMessageBuilder));
|
||||
successful = true;
|
||||
} catch (AdaptorException e) {
|
||||
log.trace("Failed to reply due to error", e);
|
||||
exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR);
|
||||
successful = false;
|
||||
}
|
||||
if (msg.getPersisted()) {
|
||||
RpcStatus status;
|
||||
if (!successful) {
|
||||
status = RpcStatus.FAILED;
|
||||
} else if (msg.getOneway()) {
|
||||
status = RpcStatus.SUCCESSFUL;
|
||||
} else {
|
||||
status = RpcStatus.DELIVERED;
|
||||
}
|
||||
TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
|
||||
.setRequestId(msg.getRequestId())
|
||||
.setRequestIdLSB(msg.getRequestIdLSB())
|
||||
.setRequestIdMSB(msg.getRequestIdMSB())
|
||||
.setStatus(status.name())
|
||||
.build();
|
||||
coapTransportResource.transportService.process(sessionInfo, responseMsg, TransportServiceCallback.EMPTY);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -17,6 +17,7 @@ package org.thingsboard.server.transport.http;
|
||||
|
||||
import com.google.gson.JsonObject;
|
||||
import com.google.gson.JsonParser;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
@ -34,9 +35,10 @@ import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.context.request.async.DeferredResult;
|
||||
import org.thingsboard.server.common.data.DeviceTransportType;
|
||||
import org.thingsboard.server.common.data.ota.OtaPackageType;
|
||||
import org.thingsboard.server.common.data.TbTransportService;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.ota.OtaPackageType;
|
||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||
import org.thingsboard.server.common.transport.SessionMsgListener;
|
||||
import org.thingsboard.server.common.transport.TransportContext;
|
||||
import org.thingsboard.server.common.transport.TransportService;
|
||||
@ -95,7 +97,9 @@ public class DeviceApiController implements TbTransportService {
|
||||
request.addAllSharedAttributeNames(sharedKeySet);
|
||||
}
|
||||
TransportService transportService = transportContext.getTransportService();
|
||||
transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout());
|
||||
transportService.registerSyncSession(sessionInfo,
|
||||
new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo),
|
||||
transportContext.getDefaultTimeout());
|
||||
transportService.process(sessionInfo, request.build(), new SessionCloseOnErrorCallback(transportService, sessionInfo));
|
||||
}));
|
||||
return responseWriter;
|
||||
@ -151,7 +155,8 @@ public class DeviceApiController implements TbTransportService {
|
||||
transportContext.getTransportService().process(DeviceTransportType.DEFAULT, ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
|
||||
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
|
||||
TransportService transportService = transportContext.getTransportService();
|
||||
transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter),
|
||||
transportService.registerSyncSession(sessionInfo,
|
||||
new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo),
|
||||
timeout == 0 ? transportContext.getDefaultTimeout() : timeout);
|
||||
transportService.process(sessionInfo, SubscribeToRPCMsg.getDefaultInstance(),
|
||||
new SessionCloseOnErrorCallback(transportService, sessionInfo));
|
||||
@ -181,7 +186,9 @@ public class DeviceApiController implements TbTransportService {
|
||||
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
|
||||
JsonObject request = new JsonParser().parse(json).getAsJsonObject();
|
||||
TransportService transportService = transportContext.getTransportService();
|
||||
transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout());
|
||||
transportService.registerSyncSession(sessionInfo,
|
||||
new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo),
|
||||
transportContext.getDefaultTimeout());
|
||||
transportService.process(sessionInfo, ToServerRpcRequestMsg.newBuilder().setRequestId(0)
|
||||
.setMethodName(request.get("method").getAsString())
|
||||
.setParams(request.get("params").toString()).build(),
|
||||
@ -198,7 +205,8 @@ public class DeviceApiController implements TbTransportService {
|
||||
transportContext.getTransportService().process(DeviceTransportType.DEFAULT, ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
|
||||
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
|
||||
TransportService transportService = transportContext.getTransportService();
|
||||
transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter),
|
||||
transportService.registerSyncSession(sessionInfo,
|
||||
new HttpSessionListener(responseWriter, transportContext.getTransportService(), sessionInfo),
|
||||
timeout == 0 ? transportContext.getDefaultTimeout() : timeout);
|
||||
transportService.process(sessionInfo, SubscribeToAttributeUpdatesMsg.getDefaultInstance(),
|
||||
new SessionCloseOnErrorCallback(transportService, sessionInfo));
|
||||
@ -372,13 +380,12 @@ public class DeviceApiController implements TbTransportService {
|
||||
}
|
||||
}
|
||||
|
||||
@RequiredArgsConstructor
|
||||
private static class HttpSessionListener implements SessionMsgListener {
|
||||
|
||||
private final DeferredResult<ResponseEntity> responseWriter;
|
||||
|
||||
HttpSessionListener(DeferredResult<ResponseEntity> responseWriter) {
|
||||
this.responseWriter = responseWriter;
|
||||
}
|
||||
private final TransportService transportService;
|
||||
private final SessionInfoProto sessionInfo;
|
||||
|
||||
@Override
|
||||
public void onGetAttributesResponse(GetAttributeResponseMsg msg) {
|
||||
@ -399,6 +406,21 @@ public class DeviceApiController implements TbTransportService {
|
||||
@Override
|
||||
public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg msg) {
|
||||
responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK));
|
||||
if (msg.getPersisted()) {
|
||||
RpcStatus status;
|
||||
if (msg.getOneway()) {
|
||||
status = RpcStatus.SUCCESSFUL;
|
||||
} else {
|
||||
status = RpcStatus.DELIVERED;
|
||||
}
|
||||
TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
|
||||
.setRequestId(msg.getRequestId())
|
||||
.setRequestIdLSB(msg.getRequestIdLSB())
|
||||
.setRequestIdMSB(msg.getRequestIdMSB())
|
||||
.setStatus(status.name())
|
||||
.build();
|
||||
transportService.process(sessionInfo, responseMsg, TransportServiceCallback.EMPTY);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -155,7 +155,7 @@ public class LwM2MBootstrapSecurityStore implements BootstrapSecurityStore {
|
||||
LwM2MServerBootstrap profileLwm2mServer = JacksonUtil.fromString(JacksonUtil.toString(bootstrapObject.getLwm2mServer()), LwM2MServerBootstrap.class);
|
||||
UUID sessionUUiD = UUID.randomUUID();
|
||||
TransportProtos.SessionInfoProto sessionInfo = helper.getValidateSessionInfo(store.getMsg(), sessionUUiD.getMostSignificantBits(), sessionUUiD.getLeastSignificantBits());
|
||||
context.getTransportService().registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(null, null, null, sessionInfo));
|
||||
context.getTransportService().registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(null, null, null, sessionInfo, context.getTransportService()));
|
||||
if (this.getValidatedSecurityMode(lwM2MBootstrapConfig.bootstrapServer, profileServerBootstrap, lwM2MBootstrapConfig.lwm2mServer, profileLwm2mServer)) {
|
||||
lwM2MBootstrapConfig.bootstrapServer = new LwM2MServerBootstrap(lwM2MBootstrapConfig.bootstrapServer, profileServerBootstrap);
|
||||
lwM2MBootstrapConfig.lwm2mServer = new LwM2MServerBootstrap(lwM2MBootstrapConfig.lwm2mServer, profileLwm2mServer);
|
||||
|
||||
@ -42,8 +42,8 @@ import org.thingsboard.server.common.transport.util.SslUtil;
|
||||
import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
|
||||
import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
|
||||
import org.thingsboard.server.transport.lwm2m.secure.credentials.LwM2MCredentials;
|
||||
import org.thingsboard.server.transport.lwm2m.server.store.TbEditableSecurityStore;
|
||||
import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MDtlsSessionStore;
|
||||
import org.thingsboard.server.transport.lwm2m.server.store.TbMainSecurityStore;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.security.auth.x500.X500Principal;
|
||||
@ -67,7 +67,7 @@ public class TbLwM2MDtlsCertificateVerifier implements NewAdvancedCertificateVer
|
||||
private final TbLwM2MDtlsSessionStore sessionStorage;
|
||||
private final LwM2MTransportServerConfig config;
|
||||
private final LwM2mCredentialsSecurityInfoValidator securityInfoValidator;
|
||||
private final TbEditableSecurityStore securityStore;
|
||||
private final TbMainSecurityStore securityStore;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private StaticCertificateVerifier staticCertificateVerifier;
|
||||
@ -134,7 +134,7 @@ public class TbLwM2MDtlsCertificateVerifier implements NewAdvancedCertificateVer
|
||||
if (msg.hasDeviceInfo() && deviceProfile != null) {
|
||||
sessionStorage.put(endpoint, new TbX509DtlsSessionInfo(cert.getSubjectX500Principal().getName(), msg));
|
||||
try {
|
||||
securityStore.put(securityInfo);
|
||||
securityStore.putX509(securityInfo);
|
||||
} catch (NonUniqueSecurityInfoException e) {
|
||||
log.trace("Failed to add security info: {}", securityInfo, e);
|
||||
}
|
||||
|
||||
@ -23,7 +23,10 @@ import org.jetbrains.annotations.NotNull;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.ResourceType;
|
||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||
import org.thingsboard.server.common.transport.SessionMsgListener;
|
||||
import org.thingsboard.server.common.transport.TransportService;
|
||||
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
|
||||
@ -45,6 +48,7 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s
|
||||
private final LwM2MAttributesService attributesService;
|
||||
private final LwM2MRpcRequestHandler rpcHandler;
|
||||
private final TransportProtos.SessionInfoProto sessionInfo;
|
||||
private final TransportService transportService;
|
||||
|
||||
@Override
|
||||
public void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse) {
|
||||
@ -78,7 +82,22 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s
|
||||
|
||||
@Override
|
||||
public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) {
|
||||
this.rpcHandler.onToDeviceRpcRequest(toDeviceRequest,this.sessionInfo);
|
||||
this.rpcHandler.onToDeviceRpcRequest(toDeviceRequest, this.sessionInfo);
|
||||
if (toDeviceRequest.getPersisted()) {
|
||||
RpcStatus status;
|
||||
if (toDeviceRequest.getOneway()) {
|
||||
status = RpcStatus.SUCCESSFUL;
|
||||
} else {
|
||||
status = RpcStatus.DELIVERED;
|
||||
}
|
||||
TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
|
||||
.setRequestId(toDeviceRequest.getRequestId())
|
||||
.setRequestIdLSB(toDeviceRequest.getRequestIdLSB())
|
||||
.setRequestIdMSB(toDeviceRequest.getRequestIdMSB())
|
||||
.setStatus(status.name())
|
||||
.build();
|
||||
transportService.process(sessionInfo, responseMsg, TransportServiceCallback.EMPTY);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -17,6 +17,7 @@ package org.thingsboard.server.transport.lwm2m.server.client;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.leshan.core.SecurityMode;
|
||||
import org.eclipse.leshan.core.model.ResourceModel;
|
||||
import org.eclipse.leshan.core.node.LwM2mPath;
|
||||
import org.eclipse.leshan.server.registration.Registration;
|
||||
@ -30,7 +31,7 @@ import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
|
||||
import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo;
|
||||
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext;
|
||||
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil;
|
||||
import org.thingsboard.server.transport.lwm2m.server.store.TbEditableSecurityStore;
|
||||
import org.thingsboard.server.transport.lwm2m.server.store.TbMainSecurityStore;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
@ -54,7 +55,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
|
||||
|
||||
private final LwM2mTransportContext context;
|
||||
private final LwM2MTransportServerConfig config;
|
||||
private final TbEditableSecurityStore securityStore;
|
||||
private final TbMainSecurityStore securityStore;
|
||||
private final Map<String, LwM2mClient> lwM2mClientsByEndpoint = new ConcurrentHashMap<>();
|
||||
private final Map<String, LwM2mClient> lwM2mClientsByRegistrationId = new ConcurrentHashMap<>();
|
||||
private final Map<UUID, Lwm2mDeviceProfileTransportConfiguration> profiles = new ConcurrentHashMap<>();
|
||||
@ -75,6 +76,9 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
|
||||
oldSession = lwM2MClient.getSession();
|
||||
TbLwM2MSecurityInfo securityInfo = securityStore.getTbLwM2MSecurityInfoByEndpoint(lwM2MClient.getEndpoint());
|
||||
if (securityInfo.getSecurityMode() != null) {
|
||||
if (SecurityMode.X509.equals(securityInfo.getSecurityMode())) {
|
||||
securityStore.registerX509(registration.getEndpoint(), registration.getId());
|
||||
}
|
||||
if (securityInfo.getDeviceProfile() != null) {
|
||||
profileUpdate(securityInfo.getDeviceProfile());
|
||||
if (securityInfo.getSecurityInfo() != null) {
|
||||
@ -124,7 +128,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
|
||||
if (currentRegistration.getId().equals(registration.getId())) {
|
||||
lwM2MClient.setState(LwM2MClientState.UNREGISTERED);
|
||||
lwM2mClientsByEndpoint.remove(lwM2MClient.getEndpoint());
|
||||
this.securityStore.remove(lwM2MClient.getEndpoint());
|
||||
this.securityStore.remove(lwM2MClient.getEndpoint(), registration.getId());
|
||||
UUID profileId = lwM2MClient.getProfileId();
|
||||
if (profileId != null) {
|
||||
Optional<LwM2mClient> otherClients = lwM2mClientsByRegistrationId.values().stream().filter(e -> e.getProfileId().equals(profileId)).findFirst();
|
||||
|
||||
@ -22,13 +22,22 @@ import org.jetbrains.annotations.Nullable;
|
||||
import org.thingsboard.server.transport.lwm2m.secure.LwM2mCredentialsSecurityInfoValidator;
|
||||
import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import static org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mTypeServer.CLIENT;
|
||||
|
||||
@Slf4j
|
||||
public class TbLwM2mSecurityStore implements TbEditableSecurityStore {
|
||||
public class TbLwM2mSecurityStore implements TbMainSecurityStore {
|
||||
|
||||
private final TbEditableSecurityStore securityStore;
|
||||
private final LwM2mCredentialsSecurityInfoValidator validator;
|
||||
private final ConcurrentMap<String, Set<String>> endpointRegistrations = new ConcurrentHashMap<>();
|
||||
|
||||
public TbLwM2mSecurityStore(TbEditableSecurityStore securityStore, LwM2mCredentialsSecurityInfoValidator validator) {
|
||||
this.securityStore = securityStore;
|
||||
@ -61,24 +70,42 @@ public class TbLwM2mSecurityStore implements TbEditableSecurityStore {
|
||||
@Nullable
|
||||
public SecurityInfo fetchAndPutSecurityInfo(String credentialsId) {
|
||||
TbLwM2MSecurityInfo securityInfo = validator.getEndpointSecurityInfoByCredentialsId(credentialsId, CLIENT);
|
||||
try {
|
||||
if (securityInfo != null) {
|
||||
securityStore.put(securityInfo);
|
||||
}
|
||||
} catch (NonUniqueSecurityInfoException e) {
|
||||
log.trace("Failed to add security info: {}", securityInfo, e);
|
||||
}
|
||||
doPut(securityInfo);
|
||||
return securityInfo != null ? securityInfo.getSecurityInfo() : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(TbLwM2MSecurityInfo tbSecurityInfo) throws NonUniqueSecurityInfoException {
|
||||
securityStore.put(tbSecurityInfo);
|
||||
private void doPut(TbLwM2MSecurityInfo securityInfo) {
|
||||
if (securityInfo != null) {
|
||||
try {
|
||||
securityStore.put(securityInfo);
|
||||
} catch (NonUniqueSecurityInfoException e) {
|
||||
log.trace("Failed to add security info: {}", securityInfo, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(String endpoint) {
|
||||
//TODO: Make sure we delay removal of security store from endpoint due to reg/unreg race condition.
|
||||
// securityStore.remove(endpoint);
|
||||
public void putX509(TbLwM2MSecurityInfo securityInfo) throws NonUniqueSecurityInfoException {
|
||||
securityStore.put(securityInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerX509(String endpoint, String registrationId) {
|
||||
endpointRegistrations.computeIfAbsent(endpoint, ep -> new HashSet<>()).add(registrationId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(String endpoint, String registrationId) {
|
||||
Set<String> epRegistrationIds = endpointRegistrations.get(endpoint);
|
||||
boolean shouldRemove;
|
||||
if (epRegistrationIds == null) {
|
||||
shouldRemove = true;
|
||||
} else {
|
||||
epRegistrationIds.remove(registrationId);
|
||||
shouldRemove = epRegistrationIds.isEmpty();
|
||||
}
|
||||
if (shouldRemove) {
|
||||
securityStore.remove(endpoint);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -51,7 +51,7 @@ public class TbLwM2mStoreFactory {
|
||||
}
|
||||
|
||||
@Bean
|
||||
private TbEditableSecurityStore securityStore() {
|
||||
private TbMainSecurityStore securityStore() {
|
||||
return new TbLwM2mSecurityStore(redisConfiguration.isPresent() && useRedis ?
|
||||
new TbLwM2mRedisSecurityStore(redisConfiguration.get().redisConnectionFactory()) : new TbInMemorySecurityStore(), validator);
|
||||
}
|
||||
|
||||
@ -0,0 +1,29 @@
|
||||
/**
|
||||
* Copyright © 2016-2021 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.transport.lwm2m.server.store;
|
||||
|
||||
import org.eclipse.leshan.server.security.NonUniqueSecurityInfoException;
|
||||
import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo;
|
||||
|
||||
public interface TbMainSecurityStore extends TbSecurityStore {
|
||||
|
||||
void putX509(TbLwM2MSecurityInfo tbSecurityInfo) throws NonUniqueSecurityInfoException;
|
||||
|
||||
void registerX509(String endpoint, String registrationId);
|
||||
|
||||
void remove(String endpoint, String registrationId);
|
||||
|
||||
}
|
||||
@ -212,7 +212,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
|
||||
}
|
||||
logService.log(lwM2MClient, LOG_LWM2M_INFO + ": Client registered with registration id: " + registration.getId());
|
||||
SessionInfoProto sessionInfo = lwM2MClient.getSession();
|
||||
transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo));
|
||||
transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo, transportService));
|
||||
log.warn("40) sessionId [{}] Registering rpc subscription after Registration client", new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()));
|
||||
TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder()
|
||||
.setSessionInfo(sessionInfo)
|
||||
@ -888,7 +888,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
|
||||
*/
|
||||
private void reportActivityAndRegister(SessionInfoProto sessionInfo) {
|
||||
if (sessionInfo != null && transportService.reportActivity(sessionInfo) == null) {
|
||||
transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo));
|
||||
transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, attributesService, rpcHandler, sessionInfo, transportService));
|
||||
this.reportActivitySubscription(sessionInfo);
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.gson.JsonParseException;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
|
||||
@ -47,8 +48,9 @@ import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.DeviceTransportType;
|
||||
import org.thingsboard.server.common.data.TransportPayloadType;
|
||||
import org.thingsboard.server.common.data.device.profile.MqttTopics;
|
||||
import org.thingsboard.server.common.data.ota.OtaPackageType;
|
||||
import org.thingsboard.server.common.data.id.OtaPackageId;
|
||||
import org.thingsboard.server.common.data.ota.OtaPackageType;
|
||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||
import org.thingsboard.server.common.msg.EncryptionUtil;
|
||||
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
|
||||
import org.thingsboard.server.common.transport.SessionMsgListener;
|
||||
@ -813,7 +815,31 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
|
||||
log.trace("[{}] Received RPC command to device", sessionId);
|
||||
try {
|
||||
deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
|
||||
deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest)
|
||||
.ifPresent(payload -> {
|
||||
ChannelFuture channelFuture = deviceSessionCtx.getChannel().writeAndFlush(payload);
|
||||
if (rpcRequest.getPersisted()) {
|
||||
channelFuture.addListener(future -> {
|
||||
RpcStatus status;
|
||||
Throwable t = future.cause();
|
||||
if (t != null) {
|
||||
log.error("Failed delivering RPC command to device!", t);
|
||||
status = RpcStatus.FAILED;
|
||||
} else if (rpcRequest.getOneway()) {
|
||||
status = RpcStatus.SUCCESSFUL;
|
||||
} else {
|
||||
status = RpcStatus.DELIVERED;
|
||||
}
|
||||
TransportProtos.ToDevicePersistedRpcResponseMsg msg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
|
||||
.setRequestId(rpcRequest.getRequestId())
|
||||
.setRequestIdLSB(rpcRequest.getRequestIdLSB())
|
||||
.setRequestIdMSB(rpcRequest.getRequestIdMSB())
|
||||
.setStatus(status.name())
|
||||
.build();
|
||||
transportService.process(deviceSessionCtx.getSessionInfo(), msg, TransportServiceCallback.EMPTY);
|
||||
});
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e);
|
||||
}
|
||||
|
||||
@ -15,9 +15,13 @@
|
||||
*/
|
||||
package org.thingsboard.server.transport.mqtt.session;
|
||||
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||
import org.thingsboard.server.common.transport.SessionMsgListener;
|
||||
import org.thingsboard.server.common.transport.TransportService;
|
||||
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
||||
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
|
||||
@ -32,9 +36,11 @@ import java.util.concurrent.ConcurrentMap;
|
||||
public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener {
|
||||
|
||||
private final GatewaySessionHandler parent;
|
||||
private final TransportService transportService;
|
||||
|
||||
public GatewayDeviceSessionCtx(GatewaySessionHandler parent, TransportDeviceInfo deviceInfo,
|
||||
DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
|
||||
DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap,
|
||||
TransportService transportService) {
|
||||
super(UUID.randomUUID(), mqttQoSMap);
|
||||
this.parent = parent;
|
||||
setSessionInfo(SessionInfoProto.newBuilder()
|
||||
@ -56,6 +62,7 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
|
||||
.build());
|
||||
setDeviceInfo(deviceInfo);
|
||||
setDeviceProfile(deviceProfile);
|
||||
this.transportService = transportService;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -89,7 +96,32 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
|
||||
@Override
|
||||
public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg request) {
|
||||
try {
|
||||
parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(parent::writeAndFlush);
|
||||
parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(
|
||||
payload -> {
|
||||
ChannelFuture channelFuture = parent.writeAndFlush(payload);
|
||||
if (request.getPersisted()) {
|
||||
channelFuture.addListener(future -> {
|
||||
RpcStatus status;
|
||||
Throwable t = future.cause();
|
||||
if (t != null) {
|
||||
log.error("Failed delivering RPC command to device!", t);
|
||||
status = RpcStatus.FAILED;
|
||||
} else if (request.getOneway()) {
|
||||
status = RpcStatus.SUCCESSFUL;
|
||||
} else {
|
||||
status = RpcStatus.DELIVERED;
|
||||
}
|
||||
TransportProtos.ToDevicePersistedRpcResponseMsg msg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
|
||||
.setRequestId(request.getRequestId())
|
||||
.setRequestIdLSB(request.getRequestIdLSB())
|
||||
.setRequestIdMSB(request.getRequestIdMSB())
|
||||
.setStatus(status.name())
|
||||
.build();
|
||||
transportService.process(getSessionInfo(), msg, TransportServiceCallback.EMPTY);
|
||||
});
|
||||
}
|
||||
}
|
||||
);
|
||||
} catch (Exception e) {
|
||||
log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
|
||||
}
|
||||
|
||||
@ -28,6 +28,7 @@ import com.google.gson.JsonSyntaxException;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import com.google.protobuf.ProtocolStringList;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.mqtt.MqttMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttPublishMessage;
|
||||
@ -188,8 +189,8 @@ public class GatewaySessionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
void writeAndFlush(MqttMessage mqttMessage) {
|
||||
channel.writeAndFlush(mqttMessage);
|
||||
ChannelFuture writeAndFlush(MqttMessage mqttMessage) {
|
||||
return channel.writeAndFlush(mqttMessage);
|
||||
}
|
||||
|
||||
int nextMsgId() {
|
||||
@ -251,7 +252,7 @@ public class GatewaySessionHandler {
|
||||
new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse>() {
|
||||
@Override
|
||||
public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
|
||||
GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap);
|
||||
GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
|
||||
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
|
||||
log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
|
||||
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
|
||||
|
||||
@ -26,7 +26,9 @@ import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration;
|
||||
import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileTransportConfiguration;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||
import org.thingsboard.server.common.transport.SessionMsgListener;
|
||||
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
||||
import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
|
||||
@ -139,6 +141,21 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
|
||||
@Override
|
||||
public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) {
|
||||
snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest);
|
||||
if (toDeviceRequest.getPersisted()) {
|
||||
RpcStatus status;
|
||||
if (toDeviceRequest.getOneway()) {
|
||||
status = RpcStatus.SUCCESSFUL;
|
||||
} else {
|
||||
status = RpcStatus.DELIVERED;
|
||||
}
|
||||
TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
|
||||
.setRequestId(toDeviceRequest.getRequestId())
|
||||
.setRequestIdLSB(toDeviceRequest.getRequestIdLSB())
|
||||
.setRequestIdMSB(toDeviceRequest.getRequestIdMSB())
|
||||
.setStatus(status.name())
|
||||
.build();
|
||||
snmpTransportContext.getTransportService().process(getSessionInfo(), responseMsg, TransportServiceCallback.EMPTY);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.DeviceTransportType;
|
||||
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
|
||||
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
|
||||
import org.thingsboard.server.common.transport.service.SessionMetaData;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
|
||||
@ -109,6 +110,8 @@ public interface TransportService {
|
||||
|
||||
void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
|
||||
|
||||
void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDevicePersistedRpcResponseMsg msg, TransportServiceCallback<Void> callback);
|
||||
|
||||
void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback);
|
||||
|
||||
void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback<Void> callback);
|
||||
|
||||
@ -557,6 +557,15 @@ public class DefaultTransportService implements TransportService {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDevicePersistedRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
|
||||
if (checkLimits(sessionInfo, msg, callback)) {
|
||||
reportActivityInternal(sessionInfo);
|
||||
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPersistedRpcResponseMsg(msg).build(),
|
||||
new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback));
|
||||
}
|
||||
}
|
||||
|
||||
private void processTimeout(String requestId) {
|
||||
RpcRequestMetadata data = toServerRpcPendingMap.remove(requestId);
|
||||
if (data != null) {
|
||||
|
||||
@ -506,6 +506,17 @@ public class ModelConstants {
|
||||
public static final String OTA_PACKAGE_DATA_SIZE_COLUMN = "data_size";
|
||||
public static final String OTA_PACKAGE_ADDITIONAL_INFO_COLUMN = ADDITIONAL_INFO_PROPERTY;
|
||||
|
||||
/**
|
||||
* Persisted RPC constants.
|
||||
*/
|
||||
public static final String RPC_TABLE_NAME = "rpc";
|
||||
public static final String RPC_TENANT_ID_COLUMN = TENANT_ID_COLUMN;
|
||||
public static final String RPC_DEVICE_ID = "device_id";
|
||||
public static final String RPC_EXPIRATION_TIME = "expiration_time";
|
||||
public static final String RPC_REQUEST = "request";
|
||||
public static final String RPC_RESPONSE = "response";
|
||||
public static final String RPC_STATUS = "status";
|
||||
|
||||
/**
|
||||
* Edge constants.
|
||||
*/
|
||||
|
||||
@ -0,0 +1,102 @@
|
||||
/**
|
||||
* Copyright © 2016-2021 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.model.sql;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import org.hibernate.annotations.Type;
|
||||
import org.hibernate.annotations.TypeDef;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.RpcId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.rpc.Rpc;
|
||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||
import org.thingsboard.server.dao.model.BaseEntity;
|
||||
import org.thingsboard.server.dao.model.BaseSqlEntity;
|
||||
import org.thingsboard.server.dao.util.mapping.JsonStringType;
|
||||
|
||||
import javax.persistence.Column;
|
||||
import javax.persistence.Entity;
|
||||
import javax.persistence.EnumType;
|
||||
import javax.persistence.Enumerated;
|
||||
import javax.persistence.Table;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.thingsboard.server.dao.model.ModelConstants.RPC_DEVICE_ID;
|
||||
import static org.thingsboard.server.dao.model.ModelConstants.RPC_EXPIRATION_TIME;
|
||||
import static org.thingsboard.server.dao.model.ModelConstants.RPC_REQUEST;
|
||||
import static org.thingsboard.server.dao.model.ModelConstants.RPC_RESPONSE;
|
||||
import static org.thingsboard.server.dao.model.ModelConstants.RPC_STATUS;
|
||||
import static org.thingsboard.server.dao.model.ModelConstants.RPC_TABLE_NAME;
|
||||
import static org.thingsboard.server.dao.model.ModelConstants.RPC_TENANT_ID_COLUMN;
|
||||
|
||||
@Data
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Entity
|
||||
@TypeDef(name = "json", typeClass = JsonStringType.class)
|
||||
@Table(name = RPC_TABLE_NAME)
|
||||
public class RpcEntity extends BaseSqlEntity<Rpc> implements BaseEntity<Rpc> {
|
||||
|
||||
@Column(name = RPC_TENANT_ID_COLUMN)
|
||||
private UUID tenantId;
|
||||
|
||||
@Column(name = RPC_DEVICE_ID)
|
||||
private UUID deviceId;
|
||||
|
||||
@Column(name = RPC_EXPIRATION_TIME)
|
||||
private long expirationTime;
|
||||
|
||||
@Type(type = "json")
|
||||
@Column(name = RPC_REQUEST)
|
||||
private JsonNode request;
|
||||
|
||||
@Type(type = "json")
|
||||
@Column(name = RPC_RESPONSE)
|
||||
private JsonNode response;
|
||||
|
||||
@Enumerated(EnumType.STRING)
|
||||
@Column(name = RPC_STATUS)
|
||||
private RpcStatus status;
|
||||
|
||||
public RpcEntity() {
|
||||
super();
|
||||
}
|
||||
|
||||
public RpcEntity(Rpc rpc) {
|
||||
this.setUuid(rpc.getUuidId());
|
||||
this.createdTime = rpc.getCreatedTime();
|
||||
this.tenantId = rpc.getTenantId().getId();
|
||||
this.deviceId = rpc.getDeviceId().getId();
|
||||
this.expirationTime = rpc.getExpirationTime();
|
||||
this.request = rpc.getRequest();
|
||||
this.response = rpc.getResponse();
|
||||
this.status = rpc.getStatus();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Rpc toData() {
|
||||
Rpc rpc = new Rpc(new RpcId(id));
|
||||
rpc.setCreatedTime(createdTime);
|
||||
rpc.setTenantId(new TenantId(tenantId));
|
||||
rpc.setDeviceId(new DeviceId(deviceId));
|
||||
rpc.setExpirationTime(expirationTime);
|
||||
rpc.setRequest(request);
|
||||
rpc.setResponse(response);
|
||||
rpc.setStatus(status);
|
||||
return rpc;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,100 @@
|
||||
/**
|
||||
* Copyright © 2016-2021 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.rpc;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.RpcId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.rpc.Rpc;
|
||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||
import org.thingsboard.server.dao.service.PaginatedRemover;
|
||||
|
||||
import static org.thingsboard.server.dao.service.Validator.validateId;
|
||||
import static org.thingsboard.server.dao.service.Validator.validatePageLink;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class BaseRpcService implements RpcService {
|
||||
public static final String INCORRECT_TENANT_ID = "Incorrect tenantId ";
|
||||
public static final String INCORRECT_RPC_ID = "Incorrect rpcId ";
|
||||
|
||||
private final RpcDao rpcDao;
|
||||
|
||||
@Override
|
||||
public Rpc save(Rpc rpc) {
|
||||
log.trace("Executing save, [{}]", rpc);
|
||||
return rpcDao.save(rpc.getTenantId(), rpc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteRpc(TenantId tenantId, RpcId rpcId) {
|
||||
log.trace("Executing deleteRpc, tenantId [{}], rpcId [{}]", tenantId, rpcId);
|
||||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
|
||||
validateId(rpcId, INCORRECT_RPC_ID + rpcId);
|
||||
rpcDao.removeById(tenantId, rpcId.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteAllRpcByTenantId(TenantId tenantId) {
|
||||
log.trace("Executing deleteAllRpcByTenantId, tenantId [{}]", tenantId);
|
||||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
|
||||
tenantRpcRemover.removeEntities(tenantId, tenantId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Rpc findById(TenantId tenantId, RpcId rpcId) {
|
||||
log.trace("Executing findById, tenantId [{}], rpcId [{}]", tenantId, rpcId);
|
||||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
|
||||
validateId(rpcId, INCORRECT_RPC_ID + rpcId);
|
||||
return rpcDao.findById(tenantId, rpcId.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Rpc> findRpcByIdAsync(TenantId tenantId, RpcId rpcId) {
|
||||
log.trace("Executing findRpcByIdAsync, tenantId [{}], rpcId: [{}]", tenantId, rpcId);
|
||||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
|
||||
validateId(rpcId, INCORRECT_RPC_ID + rpcId);
|
||||
return rpcDao.findByIdAsync(tenantId, rpcId.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<Rpc> findAllByDeviceIdAndStatus(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) {
|
||||
log.trace("Executing findAllByDeviceIdAndStatus, tenantId [{}], deviceId [{}], rpcStatus [{}], pageLink [{}]", tenantId, deviceId, rpcStatus, pageLink);
|
||||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
|
||||
validatePageLink(pageLink);
|
||||
return rpcDao.findAllByDeviceId(tenantId, deviceId, rpcStatus, pageLink);
|
||||
}
|
||||
|
||||
private PaginatedRemover<TenantId, Rpc> tenantRpcRemover =
|
||||
new PaginatedRemover<>() {
|
||||
@Override
|
||||
protected PageData<Rpc> findEntities(TenantId tenantId, TenantId id, PageLink pageLink) {
|
||||
return rpcDao.findAllRpcByTenantId(id, pageLink);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void removeEntity(TenantId tenantId, Rpc entity) {
|
||||
deleteRpc(tenantId, entity.getId());
|
||||
}
|
||||
};
|
||||
}
|
||||
32
dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java
Normal file
32
dao/src/main/java/org/thingsboard/server/dao/rpc/RpcDao.java
Normal file
@ -0,0 +1,32 @@
|
||||
/**
|
||||
* Copyright © 2016-2021 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.rpc;
|
||||
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.rpc.Rpc;
|
||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||
import org.thingsboard.server.dao.Dao;
|
||||
|
||||
public interface RpcDao extends Dao<Rpc> {
|
||||
PageData<Rpc> findAllByDeviceId(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink);
|
||||
|
||||
PageData<Rpc> findAllRpcByTenantId(TenantId tenantId, PageLink pageLink);
|
||||
|
||||
Long deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime);
|
||||
}
|
||||
@ -0,0 +1,66 @@
|
||||
/**
|
||||
* Copyright © 2016-2021 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.rpc;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.repository.CrudRepository;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.rpc.Rpc;
|
||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||
import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.model.sql.RpcEntity;
|
||||
import org.thingsboard.server.dao.rpc.RpcDao;
|
||||
import org.thingsboard.server.dao.sql.JpaAbstractDao;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@AllArgsConstructor
|
||||
public class JpaRpcDao extends JpaAbstractDao<RpcEntity, Rpc> implements RpcDao {
|
||||
|
||||
private final RpcRepository rpcRepository;
|
||||
|
||||
@Override
|
||||
protected Class<RpcEntity> getEntityClass() {
|
||||
return RpcEntity.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CrudRepository<RpcEntity, UUID> getCrudRepository() {
|
||||
return rpcRepository;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<Rpc> findAllByDeviceId(TenantId tenantId, DeviceId deviceId, RpcStatus rpcStatus, PageLink pageLink) {
|
||||
return DaoUtil.toPageData(rpcRepository.findAllByTenantIdAndDeviceIdAndStatus(tenantId.getId(), deviceId.getId(), rpcStatus, DaoUtil.toPageable(pageLink)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<Rpc> findAllRpcByTenantId(TenantId tenantId, PageLink pageLink) {
|
||||
return DaoUtil.toPageData(rpcRepository.findAllByTenantId(tenantId.getId(), DaoUtil.toPageable(pageLink)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long deleteOutdatedRpcByTenantId(TenantId tenantId, Long expirationTime) {
|
||||
return rpcRepository.deleteOutdatedRpcByTenantId(tenantId.getId(), expirationTime);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,36 @@
|
||||
/**
|
||||
* Copyright © 2016-2021 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.rpc;
|
||||
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.CrudRepository;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||
import org.thingsboard.server.dao.model.sql.RpcEntity;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
public interface RpcRepository extends CrudRepository<RpcEntity, UUID> {
|
||||
Page<RpcEntity> findAllByTenantIdAndDeviceIdAndStatus(UUID tenantId, UUID deviceId, RpcStatus status, Pageable pageable);
|
||||
|
||||
Page<RpcEntity> findAllByTenantId(UUID tenantId, Pageable pageable);
|
||||
|
||||
@Query(value = "WITH deleted AS (DELETE FROM rpc WHERE (tenant_id = :tenantId AND created_time < :expirationTime) IS TRUE RETURNING *) SELECT count(*) FROM deleted",
|
||||
nativeQuery = true)
|
||||
Long deleteOutdatedRpcByTenantId(@Param("tenantId") UUID tenantId, @Param("expirationTime") Long expirationTime);
|
||||
}
|
||||
@ -37,6 +37,7 @@ import org.thingsboard.server.dao.entityview.EntityViewService;
|
||||
import org.thingsboard.server.dao.exception.DataValidationException;
|
||||
import org.thingsboard.server.dao.ota.OtaPackageService;
|
||||
import org.thingsboard.server.dao.resource.ResourceService;
|
||||
import org.thingsboard.server.dao.rpc.RpcService;
|
||||
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||
import org.thingsboard.server.dao.service.DataValidator;
|
||||
import org.thingsboard.server.dao.service.PaginatedRemover;
|
||||
@ -96,6 +97,9 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe
|
||||
@Autowired
|
||||
private OtaPackageService otaPackageService;
|
||||
|
||||
@Autowired
|
||||
private RpcService rpcService;
|
||||
|
||||
@Override
|
||||
public Tenant findTenantById(TenantId tenantId) {
|
||||
log.trace("Executing findTenantById [{}]", tenantId);
|
||||
@ -151,6 +155,7 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe
|
||||
apiUsageStateService.deleteApiUsageStateByTenantId(tenantId);
|
||||
resourceService.deleteResourcesByTenantId(tenantId);
|
||||
otaPackageService.deleteOtaPackagesByTenantId(tenantId);
|
||||
rpcService.deleteAllRpcByTenantId(tenantId);
|
||||
tenantDao.removeById(tenantId, tenantId.getId());
|
||||
deleteEntityRelations(tenantId, tenantId);
|
||||
}
|
||||
|
||||
@ -567,3 +567,14 @@ CREATE TABLE IF NOT EXISTS edge_event (
|
||||
tenant_id uuid,
|
||||
ts bigint NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS rpc (
|
||||
id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY,
|
||||
created_time bigint NOT NULL,
|
||||
tenant_id uuid NOT NULL,
|
||||
device_id uuid NOT NULL,
|
||||
expiration_time bigint NOT NULL,
|
||||
request varchar(10000000) NOT NULL,
|
||||
response varchar(10000000),
|
||||
status varchar(255) NOT NULL
|
||||
);
|
||||
|
||||
@ -46,3 +46,4 @@ CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribu
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id);
|
||||
|
||||
@ -605,6 +605,17 @@ CREATE TABLE IF NOT EXISTS edge_event (
|
||||
ts bigint NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS rpc (
|
||||
id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY,
|
||||
created_time bigint NOT NULL,
|
||||
tenant_id uuid NOT NULL,
|
||||
device_id uuid NOT NULL,
|
||||
expiration_time bigint NOT NULL,
|
||||
request varchar(10000000) NOT NULL,
|
||||
response varchar(10000000),
|
||||
status varchar(255) NOT NULL
|
||||
);
|
||||
|
||||
CREATE OR REPLACE PROCEDURE cleanup_events_by_ttl(IN ttl bigint, IN debug_ttl bigint, INOUT deleted bigint)
|
||||
LANGUAGE plpgsql AS
|
||||
$$
|
||||
|
||||
@ -36,4 +36,5 @@ DROP TABLE IF EXISTS resource;
|
||||
DROP TABLE IF EXISTS ota_package;
|
||||
DROP TABLE IF EXISTS edge;
|
||||
DROP TABLE IF EXISTS edge_event;
|
||||
DROP TABLE IF EXISTS rpc;
|
||||
DROP FUNCTION IF EXISTS to_uuid;
|
||||
|
||||
@ -37,3 +37,4 @@ DROP TABLE IF EXISTS resource;
|
||||
DROP TABLE IF EXISTS firmware;
|
||||
DROP TABLE IF EXISTS edge;
|
||||
DROP TABLE IF EXISTS edge_event;
|
||||
DROP TABLE IF EXISTS rpc;
|
||||
|
||||
@ -2378,7 +2378,7 @@ public class RestClient implements ClientHttpRequestInterceptor, Closeable {
|
||||
|
||||
public void setUserCredentialsEnabled(UserId userId, boolean userCredentialsEnabled) {
|
||||
restTemplate.postForLocation(
|
||||
baseURL + "/api/user/{userId}/userCredentialsEnabled?serCredentialsEnabled={serCredentialsEnabled}",
|
||||
baseURL + "/api/user/{userId}/userCredentialsEnabled?userCredentialsEnabled={userCredentialsEnabled}",
|
||||
null,
|
||||
userId.getId(),
|
||||
userCredentialsEnabled);
|
||||
|
||||
@ -35,6 +35,7 @@ public final class RuleEngineDeviceRpcRequest {
|
||||
private final UUID requestUUID;
|
||||
private final String originServiceId;
|
||||
private final boolean oneway;
|
||||
private final boolean persisted;
|
||||
private final String method;
|
||||
private final String body;
|
||||
private final long expirationTime;
|
||||
|
||||
@ -33,8 +33,8 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
|
||||
type = ComponentType.FILTER,
|
||||
name = "message type switch",
|
||||
configClazz = EmptyNodeConfiguration.class,
|
||||
relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "Activity Event", "Inactivity Event",
|
||||
"Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned",
|
||||
relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Failed",
|
||||
"Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned",
|
||||
"Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant",
|
||||
"Timeseries Updated", "Timeseries Deleted"},
|
||||
nodeDescription = "Route incoming messages by Message Type",
|
||||
@ -95,6 +95,16 @@ public class TbMsgTypeSwitchNode implements TbNode {
|
||||
relationType = "Timeseries Updated";
|
||||
} else if (msg.getType().equals(DataConstants.TIMESERIES_DELETED)) {
|
||||
relationType = "Timeseries Deleted";
|
||||
} else if (msg.getType().equals(DataConstants.RPC_QUEUED)) {
|
||||
relationType = "RPC Queued";
|
||||
} else if (msg.getType().equals(DataConstants.RPC_DELIVERED)) {
|
||||
relationType = "RPC Delivered";
|
||||
} else if (msg.getType().equals(DataConstants.RPC_SUCCESSFUL)) {
|
||||
relationType = "RPC Successful";
|
||||
} else if (msg.getType().equals(DataConstants.RPC_TIMEOUT)) {
|
||||
relationType = "RPC Timeout";
|
||||
} else if (msg.getType().equals(DataConstants.RPC_FAILED)) {
|
||||
relationType = "RPC Failed";
|
||||
} else {
|
||||
relationType = "Other";
|
||||
}
|
||||
|
||||
@ -81,6 +81,9 @@ public class TbSendRPCRequestNode implements TbNode {
|
||||
tmp = msg.getMetaData().getValue("oneway");
|
||||
boolean oneway = !StringUtils.isEmpty(tmp) && Boolean.parseBoolean(tmp);
|
||||
|
||||
tmp = msg.getMetaData().getValue("persisted");
|
||||
boolean persisted = !StringUtils.isEmpty(tmp) && Boolean.parseBoolean(tmp);
|
||||
|
||||
tmp = msg.getMetaData().getValue("requestUUID");
|
||||
UUID requestUUID = !StringUtils.isEmpty(tmp) ? UUID.fromString(tmp) : Uuids.timeBased();
|
||||
tmp = msg.getMetaData().getValue("originServiceId");
|
||||
@ -108,6 +111,7 @@ public class TbSendRPCRequestNode implements TbNode {
|
||||
.originServiceId(originServiceId)
|
||||
.expirationTime(expirationTime)
|
||||
.restApiCall(restApiCall)
|
||||
.persisted(persisted)
|
||||
.build();
|
||||
|
||||
ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> {
|
||||
|
||||
@ -196,6 +196,18 @@
|
||||
{{ 'tenant-profile.alarms-ttl-days-days-range' | translate}}
|
||||
</mat-error>
|
||||
</mat-form-field>
|
||||
<mat-form-field class="mat-block">
|
||||
<mat-label translate>tenant-profile.rpc-ttl-days</mat-label>
|
||||
<input matInput required min="0" step="1"
|
||||
formControlName="rpcTtlDays"
|
||||
type="number">
|
||||
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('rpcTtlDays').hasError('required')">
|
||||
{{ 'tenant-profile.rpc-ttl-days-required' | translate}}
|
||||
</mat-error>
|
||||
<mat-error *ngIf="defaultTenantProfileConfigurationFormGroup.get('rpcTtlDays').hasError('min')">
|
||||
{{ 'tenant-profile.rpc-ttl-days-days-range' | translate}}
|
||||
</mat-error>
|
||||
</mat-form-field>
|
||||
<mat-form-field class="mat-block">
|
||||
<mat-label translate>tenant-profile.max-rule-node-executions-per-message</mat-label>
|
||||
<input matInput required min="0" step="1"
|
||||
|
||||
@ -77,7 +77,8 @@ export class DefaultTenantProfileConfigurationComponent implements ControlValueA
|
||||
maxSms: [null, [Validators.required, Validators.min(0)]],
|
||||
maxCreatedAlarms: [null, [Validators.required, Validators.min(0)]],
|
||||
defaultStorageTtlDays: [null, [Validators.required, Validators.min(0)]],
|
||||
alarmsTtlDays: [null, [Validators.required, Validators.min(0)]]
|
||||
alarmsTtlDays: [null, [Validators.required, Validators.min(0)]],
|
||||
rpcTtlDays: [null, [Validators.required, Validators.min(0)]]
|
||||
});
|
||||
this.defaultTenantProfileConfigurationFormGroup.valueChanges.subscribe(() => {
|
||||
this.updateModel();
|
||||
|
||||
@ -352,7 +352,12 @@ export enum MessageType {
|
||||
ATTRIBUTES_UPDATED = 'ATTRIBUTES_UPDATED',
|
||||
ATTRIBUTES_DELETED = 'ATTRIBUTES_DELETED',
|
||||
TIMESERIES_UPDATED = 'TIMESERIES_UPDATED',
|
||||
TIMESERIES_DELETED = 'TIMESERIES_DELETED'
|
||||
TIMESERIES_DELETED = 'TIMESERIES_DELETED',
|
||||
RPC_QUEUED = 'RPC_QUEUED',
|
||||
RPC_DELIVERED = 'RPC_DELIVERED',
|
||||
RPC_SUCCESSFUL = 'RPC_SUCCESSFUL',
|
||||
RPC_TIMEOUT = 'RPC_TIMEOUT',
|
||||
RPC_FAILED = 'RPC_FAILED'
|
||||
}
|
||||
|
||||
export const messageTypeNames = new Map<MessageType, string>(
|
||||
@ -373,7 +378,12 @@ export const messageTypeNames = new Map<MessageType, string>(
|
||||
[MessageType.ATTRIBUTES_UPDATED, 'Attributes Updated'],
|
||||
[MessageType.ATTRIBUTES_DELETED, 'Attributes Deleted'],
|
||||
[MessageType.TIMESERIES_UPDATED, 'Timeseries Updated'],
|
||||
[MessageType.TIMESERIES_DELETED, 'Timeseries Deleted']
|
||||
[MessageType.TIMESERIES_DELETED, 'Timeseries Deleted'],
|
||||
[MessageType.RPC_QUEUED, 'RPC Queued'],
|
||||
[MessageType.RPC_DELIVERED, 'RPC Delivered'],
|
||||
[MessageType.RPC_SUCCESSFUL, 'RPC Successful'],
|
||||
[MessageType.RPC_TIMEOUT, 'RPC Timeout'],
|
||||
[MessageType.RPC_FAILED, 'RPC Failed']
|
||||
]
|
||||
);
|
||||
|
||||
|
||||
@ -53,6 +53,7 @@ export interface DefaultTenantProfileConfiguration {
|
||||
|
||||
defaultStorageTtlDays: number;
|
||||
alarmsTtlDays: number;
|
||||
rpcTtlDays: number;
|
||||
}
|
||||
|
||||
export type TenantProfileConfigurations = DefaultTenantProfileConfiguration;
|
||||
@ -85,7 +86,8 @@ export function createTenantProfileConfiguration(type: TenantProfileType): Tenan
|
||||
maxSms: 0,
|
||||
maxCreatedAlarms: 0,
|
||||
defaultStorageTtlDays: 0,
|
||||
alarmsTtlDays: 0
|
||||
alarmsTtlDays: 0,
|
||||
rpcTtlDays: 0
|
||||
};
|
||||
configuration = {...defaultConfiguration, type: TenantProfileType.DEFAULT};
|
||||
break;
|
||||
|
||||
@ -2638,6 +2638,9 @@
|
||||
"alarms-ttl-days": "Alarms TTL days (0 - unlimited)",
|
||||
"alarms-ttl-days-required": "Alarms TTL days required",
|
||||
"alarms-ttl-days-days-range": "Alarms TTL days can't be negative",
|
||||
"rpc-ttl-days": "RPC TTL days (0 - unlimited)",
|
||||
"rpc-ttl-days-required": "RPC TTL days required",
|
||||
"rpc-ttl-days-days-range": "RPC TTL days can't be negative",
|
||||
"max-rule-node-executions-per-message": "Maximum number of rule node executions per message (0 - unlimited)",
|
||||
"max-rule-node-executions-per-message-required": "Maximum number of rule node executions per message is required.",
|
||||
"max-rule-node-executions-per-message-range": "Maximum number of rule node executions per message can't be negative",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user