diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 5042c5b2d8..22e4aae78b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -199,7 +199,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { log.debug("[{}][{}] device is related to edge [{}]. Saving RPC request to edge queue", tenantId, deviceId, edgeId.getId()); saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId()); sent = true; - } else if (!rpcSequenceEnabled || toDeviceRpcPendingMap.isEmpty()) { + } else if (isSendNewRpcAvailable()) { sent = rpcSubscriptions.size() > 0; Set syncSessionSet = new HashSet<>(); rpcSubscriptions.forEach((key, value) -> { @@ -231,6 +231,18 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } } + private boolean isSendNewRpcAvailable() { + if (rpcSequenceEnabled) { + for (ToDeviceRpcRequestMetadata rpc : toDeviceRpcPendingMap.values()) { + if (!rpc.isDelivered()) { + return false; + } + } + } + + return true; + } + private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) { Rpc rpc = new Rpc(new RpcId(request.getId())); rpc.setCreatedTime(System.currentTimeMillis()); @@ -347,7 +359,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { .setMethodName(body.getMethod()) .setParams(body.getParams()) .setExpirationTime(request.getExpirationTime()) - .setTimeout(request.getTimeout()) .setRequestIdMSB(request.getId().getMostSignificantBits()) .setRequestIdLSB(request.getId().getLeastSignificantBits()) .setOneway(request.isOneway()) @@ -563,7 +574,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, response); } } finally { - if (!requestMd.isDelivered() && hasError) { + if (hasError) { sendNextPendingRequest(context); } } diff --git a/application/src/main/java/org/thingsboard/server/controller/AbstractRpcController.java b/application/src/main/java/org/thingsboard/server/controller/AbstractRpcController.java index 56325f1cdb..b7dbd8b3d9 100644 --- a/application/src/main/java/org/thingsboard/server/controller/AbstractRpcController.java +++ b/application/src/main/java/org/thingsboard/server/controller/AbstractRpcController.java @@ -88,7 +88,6 @@ public abstract class AbstractRpcController extends BaseController { deviceId, oneWay, expTime, - timeout, body, persisted, additionalInfo diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java index 6791f5dd6c..230f6e759e 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java @@ -101,7 +101,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi @Override public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer consumer) { ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(), - src.isOneway(), src.getExpirationTime(), src.getTimeout(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted(), src.getAdditionalInfo()); + src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted(), src.getAdditionalInfo()); forwardRpcRequestToDeviceActor(request, response -> { if (src.isRestApiCall()) { sendRpcResponseToTbCore(src.getOriginServiceId(), response); diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 91408fff41..5b220974ef 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -334,7 +334,6 @@ message ToDeviceRpcRequestMsg { int64 requestIdLSB = 6; bool oneway = 7; bool persisted = 8; - int64 timeout = 9; } message ToDeviceRpcResponseMsg { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index caaaa4cbd3..54ecc31f83 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -87,9 +87,11 @@ public class DataConstants { public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE"; public static final String RPC_QUEUED = "RPC_QUEUED"; + public static final String RPC_SENT = "RPC_SENT"; public static final String RPC_DELIVERED = "RPC_DELIVERED"; public static final String RPC_SUCCESSFUL = "RPC_SUCCESSFUL"; public static final String RPC_TIMEOUT = "RPC_TIMEOUT"; + public static final String RPC_EXPIRED = "RPC_EXPIRED"; public static final String RPC_FAILED = "RPC_FAILED"; public static final String RPC_DELETED = "RPC_DELETED"; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/ToDeviceRpcRequest.java b/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/ToDeviceRpcRequest.java index 912304f962..f9bb2b3810 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/ToDeviceRpcRequest.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/ToDeviceRpcRequest.java @@ -34,7 +34,6 @@ public class ToDeviceRpcRequest implements Serializable { private final DeviceId deviceId; private final boolean oneway; private final long expirationTime; - private final long timeout; private final ToDeviceRpcRequestBody body; private final boolean persisted; @JsonIgnore diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java index d1ee5c29e3..082a94dbe6 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java @@ -193,29 +193,7 @@ public class DefaultCoapClientContext implements CoapClientContext { client.lock(); try { long uplinkTime = client.updateLastUplinkTime(uplinkTs); - long timeout; - if (PowerMode.PSM.equals(powerMode)) { - Long psmActivityTimer = client.getPsmActivityTimer(); - if (psmActivityTimer == null && profileSettings != null) { - psmActivityTimer = profileSettings.getPsmActivityTimer(); - - } - if (psmActivityTimer == null || psmActivityTimer == 0L) { - psmActivityTimer = config.getPsmActivityTimer(); - } - - timeout = psmActivityTimer; - } else { - Long pagingTransmissionWindow = client.getPagingTransmissionWindow(); - if (pagingTransmissionWindow == null && profileSettings != null) { - pagingTransmissionWindow = profileSettings.getPagingTransmissionWindow(); - - } - if (pagingTransmissionWindow == null || pagingTransmissionWindow == 0L) { - pagingTransmissionWindow = config.getPagingTransmissionWindow(); - } - timeout = pagingTransmissionWindow; - } + long timeout = getTimeout(client, powerMode, profileSettings); Future sleepTask = client.getSleepTask(); if (sleepTask != null) { sleepTask.cancel(false); @@ -235,6 +213,33 @@ public class DefaultCoapClientContext implements CoapClientContext { } } + private long getTimeout(TbCoapClientState client, PowerMode powerMode, PowerSavingConfiguration profileSettings) { + long timeout; + if (PowerMode.PSM.equals(powerMode)) { + Long psmActivityTimer = client.getPsmActivityTimer(); + if (psmActivityTimer == null && profileSettings != null) { + psmActivityTimer = profileSettings.getPsmActivityTimer(); + + } + if (psmActivityTimer == null || psmActivityTimer == 0L) { + psmActivityTimer = config.getPsmActivityTimer(); + } + + timeout = psmActivityTimer; + } else { + Long pagingTransmissionWindow = client.getPagingTransmissionWindow(); + if (pagingTransmissionWindow == null && profileSettings != null) { + pagingTransmissionWindow = profileSettings.getPagingTransmissionWindow(); + + } + if (pagingTransmissionWindow == null || pagingTransmissionWindow == 0L) { + pagingTransmissionWindow = config.getPagingTransmissionWindow(); + } + timeout = pagingTransmissionWindow; + } + return timeout; + } + private boolean registerFeatureObservation(TbCoapClientState state, String token, CoapExchange exchange, FeatureType featureType) { state.lock(); try { @@ -526,13 +531,25 @@ public class DefaultCoapClientContext implements CoapClientContext { int requestId = getNextMsgId(); response.setMID(requestId); if (conRequest) { + PowerMode powerMode = state.getPowerMode(); + PowerSavingConfiguration profileSettings = null; + if (powerMode == null) { + var clientProfile = getProfile(state.getProfileId()); + if (clientProfile.isPresent()) { + profileSettings = clientProfile.get().getClientSettings(); + if (profileSettings != null) { + powerMode = profileSettings.getPowerMode(); + } + } + } + transportContext.getRpcAwaitingAck().put(requestId, msg); transportContext.getScheduler().schedule(() -> { TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(requestId); if (rpcRequestMsg != null) { transportService.process(state.getSession(), msg, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY); } - }, Math.max(0, Math.min(msg.getTimeout(), msg.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS); + }, Math.min(getTimeout(state, powerMode, profileSettings), msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> { TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java index 11b46696da..1335b6105b 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -68,4 +68,7 @@ public class MqttTransportContext extends TransportContext { @Value("${transport.mqtt.msg_queue_size_per_device_limit:100}") private int messageQueueSizePerDeviceLimit; + @Getter + @Value("${transport.mqtt.timeout:10000}") + private long timeout; } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index a5b9ef9d4a..d8d06a3173 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -857,7 +857,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (msg != null) { transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY); } - }, Math.max(0, Math.min(rpcRequest.getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS); + }, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS); } var cf = publish(payload, deviceSessionCtx); cf.addListener(result -> { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java index e9248982bb..afdee27b74 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java @@ -39,7 +39,6 @@ public final class RuleEngineDeviceRpcRequest { private final String method; private final String body; private final long expirationTime; - private final long timeout; private final boolean restApiCall; private final String additionalInfo; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java index cda3269791..6303a888df 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java @@ -33,7 +33,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; type = ComponentType.FILTER, name = "message type switch", configClazz = EmptyNodeConfiguration.class, - relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Failed", "RPC Deleted", + relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Sent", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Expired", "RPC Failed", "RPC Deleted", "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned", "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant", "Timeseries Updated", "Timeseries Deleted"}, @@ -97,12 +97,16 @@ public class TbMsgTypeSwitchNode implements TbNode { relationType = "Timeseries Deleted"; } else if (msg.getType().equals(DataConstants.RPC_QUEUED)) { relationType = "RPC Queued"; + } else if (msg.getType().equals(DataConstants.RPC_SENT)) { + relationType = "RPC Sent"; } else if (msg.getType().equals(DataConstants.RPC_DELIVERED)) { relationType = "RPC Delivered"; } else if (msg.getType().equals(DataConstants.RPC_SUCCESSFUL)) { relationType = "RPC Successful"; } else if (msg.getType().equals(DataConstants.RPC_TIMEOUT)) { relationType = "RPC Timeout"; + } else if (msg.getType().equals(DataConstants.RPC_EXPIRED)) { + relationType = "RPC Expired"; } else if (msg.getType().equals(DataConstants.RPC_FAILED)) { relationType = "RPC Failed"; } else if (msg.getType().equals(DataConstants.RPC_DELETED)) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java index 79428f03e1..5b40f41857 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java @@ -92,9 +92,6 @@ public class TbSendRPCRequestNode implements TbNode { tmp = msg.getMetaData().getValue(DataConstants.EXPIRATION_TIME); long expirationTime = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : (System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds())); - tmp = msg.getMetaData().getValue(DataConstants.TIMEOUT); - long timeout = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds()); - String params; JsonElement paramsEl = json.get("params"); if (paramsEl.isJsonPrimitive()) { @@ -115,7 +112,6 @@ public class TbSendRPCRequestNode implements TbNode { .requestUUID(requestUUID) .originServiceId(originServiceId) .expirationTime(expirationTime) - .timeout(timeout) .restApiCall(restApiCall) .persisted(persisted) .additionalInfo(additionalInfo)