Merge branch 'develop/3.5' into feature/devices-activity
This commit is contained in:
commit
f431a4f0d5
@ -598,7 +598,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
|
||||
response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> {
|
||||
TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
|
||||
if (rpcRequestMsg != null) {
|
||||
transportService.process(state.getSession(), rpcRequestMsg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
|
||||
transportService.process(state.getSession(), rpcRequestMsg, RpcStatus.DELIVERED, true, TransportServiceCallback.EMPTY);
|
||||
}
|
||||
}, id -> {
|
||||
TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
|
||||
|
||||
@ -65,7 +65,7 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR
|
||||
|
||||
@Override
|
||||
public void onSuccess(R request, T response) {
|
||||
transportService.process(client.getSession(), this.request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
|
||||
transportService.process(client.getSession(), this.request, RpcStatus.DELIVERED, true, TransportServiceCallback.EMPTY);
|
||||
sendRpcReplyOnSuccess(response);
|
||||
if (callback != null) {
|
||||
callback.onSuccess(request, response);
|
||||
|
||||
@ -322,7 +322,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId();
|
||||
TransportProtos.ToDeviceRpcRequestMsg rpcRequest = rpcAwaitingAck.remove(msgId);
|
||||
if (rpcRequest != null) {
|
||||
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
|
||||
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, true, TransportServiceCallback.EMPTY);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
|
||||
@ -117,6 +117,8 @@ public interface TransportService {
|
||||
|
||||
void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
|
||||
|
||||
void process(SessionInfoProto sessionInfo, ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, boolean reportActivity, TransportServiceCallback<Void> callback);
|
||||
|
||||
void process(SessionInfoProto sessionInfo, ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, TransportServiceCallback<Void> callback);
|
||||
|
||||
void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback);
|
||||
|
||||
@ -647,6 +647,11 @@ public class DefaultTransportService implements TransportService {
|
||||
|
||||
@Override
|
||||
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, TransportServiceCallback<Void> callback) {
|
||||
process(sessionInfo, msg, rpcStatus, false, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, boolean reportActivity, TransportServiceCallback<Void> callback) {
|
||||
TransportProtos.ToDeviceRpcResponseStatusMsg responseMsg = TransportProtos.ToDeviceRpcResponseStatusMsg.newBuilder()
|
||||
.setRequestId(msg.getRequestId())
|
||||
.setRequestIdLSB(msg.getRequestIdLSB())
|
||||
@ -655,7 +660,9 @@ public class DefaultTransportService implements TransportService {
|
||||
.build();
|
||||
|
||||
if (checkLimits(sessionInfo, responseMsg, callback)) {
|
||||
reportActivityInternal(sessionInfo);
|
||||
if (reportActivity) {
|
||||
reportActivityInternal(sessionInfo);
|
||||
}
|
||||
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setRpcResponseStatusMsg(responseMsg).build(),
|
||||
new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, TransportServiceCallback.EMPTY));
|
||||
}
|
||||
|
||||
@ -35,7 +35,6 @@ import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.client.HttpComponentsAsyncClientHttpRequestFactory;
|
||||
import org.springframework.http.client.Netty4ClientHttpRequestFactory;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
import org.springframework.web.client.AsyncRestTemplate;
|
||||
@ -49,6 +48,7 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
||||
import org.thingsboard.rule.engine.credentials.BasicCredentials;
|
||||
import org.thingsboard.rule.engine.credentials.ClientCredentials;
|
||||
import org.thingsboard.rule.engine.credentials.CredentialsType;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
|
||||
@ -192,7 +192,7 @@ public class TbHttpClient {
|
||||
config.isIgnoreRequestBody()) {
|
||||
entity = new HttpEntity<>(headers);
|
||||
} else {
|
||||
entity = new HttpEntity<>(msg.getData(), headers);
|
||||
entity = new HttpEntity<>(getData(msg), headers);
|
||||
}
|
||||
|
||||
URI uri = buildEncodedUri(endpointUrl);
|
||||
@ -243,6 +243,18 @@ public class TbHttpClient {
|
||||
return uri;
|
||||
}
|
||||
|
||||
private String getData(TbMsg msg) {
|
||||
String data = msg.getData();
|
||||
|
||||
if (config.isTrimDoubleQuotes()) {
|
||||
final String dataBefore = data;
|
||||
data = data.replaceAll("^\"|\"$", "");;
|
||||
log.trace("Trimming double quotes. Before trim: [{}], after trim: [{}]", dataBefore, data);
|
||||
}
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
private TbMsg processResponse(TbContext ctx, TbMsg origMsg, ResponseEntity<String> response) {
|
||||
TbMsgMetaData metaData = origMsg.getMetaData();
|
||||
metaData.putValue(STATUS, response.getStatusCode().name());
|
||||
|
||||
@ -37,8 +37,7 @@ public class TbRestApiCallNodeConfiguration implements NodeConfiguration<TbRestA
|
||||
private int readTimeoutMs;
|
||||
private int maxParallelRequestsCount;
|
||||
private boolean useRedisQueueForMsgPersistence;
|
||||
private boolean trimQueue;
|
||||
private int maxQueueSize;
|
||||
private boolean trimDoubleQuotes;
|
||||
private boolean enableProxy;
|
||||
private boolean useSystemProxyProperties;
|
||||
private String proxyHost;
|
||||
@ -59,7 +58,7 @@ public class TbRestApiCallNodeConfiguration implements NodeConfiguration<TbRestA
|
||||
configuration.setReadTimeoutMs(0);
|
||||
configuration.setMaxParallelRequestsCount(0);
|
||||
configuration.setUseRedisQueueForMsgPersistence(false);
|
||||
configuration.setTrimQueue(false);
|
||||
configuration.setTrimDoubleQuotes(false);
|
||||
configuration.setEnableProxy(false);
|
||||
configuration.setCredentials(new AnonymousCredentials());
|
||||
configuration.setIgnoreRequestBody(false);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user