added additionalInfo to the rpc

This commit is contained in:
YevhenBondarenko 2021-08-04 18:03:54 +03:00 committed by Andrew Shvayka
parent 9d651a7252
commit 373f2f9f0e
14 changed files with 28 additions and 2 deletions

View File

@ -209,6 +209,7 @@ CREATE TABLE IF NOT EXISTS rpc (
expiration_time bigint NOT NULL,
request varchar(10000000) NOT NULL,
response varchar(10000000),
additional_info varchar(10000000),
status varchar(255) NOT NULL
);

View File

@ -234,6 +234,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
rpc.setExpirationTime(request.getExpirationTime());
rpc.setRequest(JacksonUtil.valueToTree(request));
rpc.setStatus(status);
rpc.setAdditionalInfo(JacksonUtil.valueToTree(request.getAdditionalInfo()));
return systemContext.getTbRpcService().save(tenantId, rpc);
}

View File

@ -90,6 +90,7 @@ public abstract class AbstractRpcController extends BaseController {
long expTime = System.currentTimeMillis() + Math.max(minTimeout, timeout);
UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID();
boolean persisted = rpcRequestBody.has(DataConstants.PERSISTENT) && rpcRequestBody.get(DataConstants.PERSISTENT).asBoolean();
String additionalInfo = JacksonUtil.toString(rpcRequestBody.get(DataConstants.ADDITIONAL_INFO));
accessValidator.validate(currentUser, Operation.RPC_CALL, deviceId, new HttpValidationCallback(response, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
@ -99,7 +100,8 @@ public abstract class AbstractRpcController extends BaseController {
oneWay,
expTime,
body,
persisted
persisted,
additionalInfo
);
deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse, timeoutStatus, noActiveConnectionStatus), currentUser);
}

View File

@ -168,6 +168,8 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
entityNode.put("method", msg.getBody().getMethod());
entityNode.put("params", msg.getBody().getParams());
entityNode.put(DataConstants.ADDITIONAL_INFO, msg.getAdditionalInfo());
try {
TbMsg tbMsg = TbMsg.newMsg(DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE, msg.getDeviceId(), currentUser.getCustomerId(), metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode));
clusterService.pushMsgToRuleEngine(msg.getTenantId(), msg.getDeviceId(), tbMsg, null);

View File

@ -100,7 +100,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
@Override
public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(),
src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted());
src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted(), src.getAdditionalInfo());
forwardRpcRequestToDeviceActor(request, response -> {
if (src.isRestApiCall()) {
sendRpcResponseToTbCore(src.getOriginServiceId(), response);

View File

@ -36,6 +36,7 @@ public class DataConstants {
public static final String ALARM_CONDITION_REPEATS = "alarmConditionRepeats";
public static final String ALARM_CONDITION_DURATION = "alarmConditionDuration";
public static final String PERSISTENT = "persistent";
public static final String ADDITIONAL_INFO = "additionalInfo";
public static final String COAP_TRANSPORT_NAME = "COAP";
public static final String LWM2M_TRANSPORT_NAME = "LWM2M";
public static final String MQTT_TRANSPORT_NAME = "MQTT";

View File

@ -33,6 +33,7 @@ public class Rpc extends BaseData<RpcId> implements HasTenantId {
private JsonNode request;
private JsonNode response;
private RpcStatus status;
private JsonNode additionalInfo;
public Rpc() {
super();
@ -50,5 +51,6 @@ public class Rpc extends BaseData<RpcId> implements HasTenantId {
this.request = rpc.getRequest();
this.response = rpc.getResponse();
this.status = rpc.getStatus();
this.additionalInfo = rpc.getAdditionalInfo();
}
}

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.common.msg.rpc;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
@ -35,5 +36,7 @@ public class ToDeviceRpcRequest implements Serializable {
private final long expirationTime;
private final ToDeviceRpcRequestBody body;
private final boolean persisted;
@JsonIgnore
private final String additionalInfo;
}

View File

@ -519,6 +519,7 @@ public class ModelConstants {
public static final String RPC_REQUEST = "request";
public static final String RPC_RESPONSE = "response";
public static final String RPC_STATUS = "status";
public static final String RPC_ADDITIONAL_INFO = ADDITIONAL_INFO_PROPERTY;
/**
* Edge constants.

View File

@ -36,6 +36,7 @@ import javax.persistence.Enumerated;
import javax.persistence.Table;
import java.util.UUID;
import static org.thingsboard.server.dao.model.ModelConstants.RPC_ADDITIONAL_INFO;
import static org.thingsboard.server.dao.model.ModelConstants.RPC_DEVICE_ID;
import static org.thingsboard.server.dao.model.ModelConstants.RPC_EXPIRATION_TIME;
import static org.thingsboard.server.dao.model.ModelConstants.RPC_REQUEST;
@ -72,6 +73,10 @@ public class RpcEntity extends BaseSqlEntity<Rpc> implements BaseEntity<Rpc> {
@Column(name = RPC_STATUS)
private RpcStatus status;
@Type(type = "json")
@Column(name = RPC_ADDITIONAL_INFO)
private JsonNode additionalInfo;
public RpcEntity() {
super();
}
@ -85,6 +90,7 @@ public class RpcEntity extends BaseSqlEntity<Rpc> implements BaseEntity<Rpc> {
this.request = rpc.getRequest();
this.response = rpc.getResponse();
this.status = rpc.getStatus();
this.additionalInfo = rpc.getAdditionalInfo();
}
@Override
@ -97,6 +103,7 @@ public class RpcEntity extends BaseSqlEntity<Rpc> implements BaseEntity<Rpc> {
rpc.setRequest(request);
rpc.setResponse(response);
rpc.setStatus(status);
rpc.setAdditionalInfo(additionalInfo);
return rpc;
}
}

View File

@ -582,5 +582,6 @@ CREATE TABLE IF NOT EXISTS rpc (
expiration_time bigint NOT NULL,
request varchar(10000000) NOT NULL,
response varchar(10000000),
additional_info varchar(10000000),
status varchar(255) NOT NULL
);

View File

@ -616,6 +616,7 @@ CREATE TABLE IF NOT EXISTS rpc (
expiration_time bigint NOT NULL,
request varchar(10000000) NOT NULL,
response varchar(10000000),
additional_info varchar(10000000),
status varchar(255) NOT NULL
);

View File

@ -40,5 +40,6 @@ public final class RuleEngineDeviceRpcRequest {
private final String body;
private final long expirationTime;
private final boolean restApiCall;
private final String additionalInfo;
}

View File

@ -100,6 +100,8 @@ public class TbSendRPCRequestNode implements TbNode {
params = gson.toJson(paramsEl);
}
String additionalInfo = gson.toJson(json.get(DataConstants.ADDITIONAL_INFO));
RuleEngineDeviceRpcRequest request = RuleEngineDeviceRpcRequest.builder()
.oneway(oneway)
.method(json.get("method").getAsString())
@ -112,6 +114,7 @@ public class TbSendRPCRequestNode implements TbNode {
.expirationTime(expirationTime)
.restApiCall(restApiCall)
.persisted(persisted)
.additionalInfo(additionalInfo)
.build();
ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> {