Merge remote-tracking branch 'ce/master' into jwt-random

This commit is contained in:
Sergey Matvienko 2022-11-14 14:32:02 +01:00
commit 4fc13b7938
17 changed files with 305 additions and 31 deletions

View File

@ -823,6 +823,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
body.put("expirationTime", msg.getExpirationTime()); body.put("expirationTime", msg.getExpirationTime());
body.put("method", msg.getBody().getMethod()); body.put("method", msg.getBody().getMethod());
body.put("params", msg.getBody().getParams()); body.put("params", msg.getBody().getParams());
body.put("persisted", msg.isPersisted());
body.put("retries", msg.getRetries());
body.put("additionalInfo", msg.getAdditionalInfo());
EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.DEVICE, EdgeEventActionType.RPC_CALL, deviceId, body); EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.DEVICE, EdgeEventActionType.RPC_CALL, deviceId, body);

View File

@ -611,7 +611,7 @@ public final class EdgeGrpcSession implements Closeable {
} }
if (uplinkMsg.getDeviceRpcCallMsgCount() > 0) { if (uplinkMsg.getDeviceRpcCallMsgCount() > 0) {
for (DeviceRpcCallMsg deviceRpcCallMsg : uplinkMsg.getDeviceRpcCallMsgList()) { for (DeviceRpcCallMsg deviceRpcCallMsg : uplinkMsg.getDeviceRpcCallMsgList()) {
result.add(ctx.getDeviceProcessor().processDeviceRpcCallResponseFromEdge(edge.getTenantId(), deviceRpcCallMsg)); result.add(ctx.getDeviceProcessor().processDeviceRpcCallFromEdge(edge.getTenantId(), edge, deviceRpcCallMsg));
} }
} }
if (uplinkMsg.getWidgetBundleTypesRequestMsgCount() > 0) { if (uplinkMsg.getWidgetBundleTypesRequestMsgCount() > 0) {

View File

@ -21,13 +21,13 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DeviceRpcCallMsg; import org.thingsboard.server.gen.edge.v1.DeviceRpcCallMsg;
import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg; import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg;
import org.thingsboard.server.gen.edge.v1.RpcRequestMsg; import org.thingsboard.server.gen.edge.v1.RpcRequestMsg;
import org.thingsboard.server.gen.edge.v1.RpcResponseMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
@ -97,25 +97,55 @@ public class DeviceMsgConstructor {
} }
public DeviceRpcCallMsg constructDeviceRpcCallMsg(UUID deviceId, JsonNode body) { public DeviceRpcCallMsg constructDeviceRpcCallMsg(UUID deviceId, JsonNode body) {
int requestId = body.get("requestId").asInt(); DeviceRpcCallMsg.Builder builder = constructDeviceRpcMsg(deviceId, body);
boolean oneway = body.get("oneway").asBoolean(); if (body.has("error") || body.has("response")) {
UUID requestUUID = UUID.fromString(body.get("requestUUID").asText()); RpcResponseMsg.Builder responseBuilder = RpcResponseMsg.newBuilder();
long expirationTime = body.get("expirationTime").asLong(); if (body.has("error")) {
String method = body.get("method").asText(); responseBuilder.setError(body.get("error").asText());
String params = body.get("params").asText(); } else {
responseBuilder.setResponse(body.get("response").asText());
}
builder.setResponseMsg(responseBuilder.build());
} else {
RpcRequestMsg.Builder requestBuilder = RpcRequestMsg.newBuilder();
requestBuilder.setMethod(body.get("method").asText());
requestBuilder.setParams(body.get("params").asText());
builder.setRequestMsg(requestBuilder.build());
}
return builder.build();
}
RpcRequestMsg.Builder requestBuilder = RpcRequestMsg.newBuilder(); private DeviceRpcCallMsg.Builder constructDeviceRpcMsg(UUID deviceId, JsonNode body) {
requestBuilder.setMethod(method);
requestBuilder.setParams(params);
DeviceRpcCallMsg.Builder builder = DeviceRpcCallMsg.newBuilder() DeviceRpcCallMsg.Builder builder = DeviceRpcCallMsg.newBuilder()
.setDeviceIdMSB(deviceId.getMostSignificantBits()) .setDeviceIdMSB(deviceId.getMostSignificantBits())
.setDeviceIdLSB(deviceId.getLeastSignificantBits()) .setDeviceIdLSB(deviceId.getLeastSignificantBits())
.setRequestUuidMSB(requestUUID.getMostSignificantBits()) .setRequestId(body.get("requestId").asInt());
.setRequestUuidLSB(requestUUID.getLeastSignificantBits()) if (body.get("oneway") != null) {
.setRequestId(requestId) builder.setOneway(body.get("oneway").asBoolean());
.setExpirationTime(expirationTime) }
.setOneway(oneway) if (body.get("requestUUID") != null) {
.setRequestMsg(requestBuilder.build()); UUID requestUUID = UUID.fromString(body.get("requestUUID").asText());
return builder.build(); builder.setRequestUuidMSB(requestUUID.getMostSignificantBits())
.setRequestUuidLSB(requestUUID.getLeastSignificantBits());
}
if (body.get("expirationTime") != null) {
builder.setExpirationTime(body.get("expirationTime").asLong());
}
if (body.get("persisted") != null) {
builder.setPersisted(body.get("persisted").asBoolean());
}
if (body.get("retries") != null) {
builder.setRetries(body.get("retries").asInt());
}
if (body.get("additionalInfo") != null) {
builder.setAdditionalInfo(JacksonUtil.toString(body.get("additionalInfo")));
}
if (body.get("serviceId") != null) {
builder.setServiceId(body.get("serviceId").asText());
}
if (body.get("sessionId") != null) {
builder.setSessionId(body.get("sessionId").asText());
}
return builder;
} }
} }

View File

@ -52,6 +52,7 @@ import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.gen.edge.v1.DeviceCredentialsRequestMsg; import org.thingsboard.server.gen.edge.v1.DeviceCredentialsRequestMsg;
import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg;
@ -325,8 +326,17 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
return metaData; return metaData;
} }
public ListenableFuture<Void> processDeviceRpcCallResponseFromEdge(TenantId tenantId, DeviceRpcCallMsg deviceRpcCallMsg) { public ListenableFuture<Void> processDeviceRpcCallFromEdge(TenantId tenantId, Edge edge, DeviceRpcCallMsg deviceRpcCallMsg) {
log.trace("[{}] processDeviceRpcCallResponseMsg [{}]", tenantId, deviceRpcCallMsg); log.trace("[{}] processDeviceRpcCallFromEdge [{}]", tenantId, deviceRpcCallMsg);
if (deviceRpcCallMsg.hasResponseMsg()) {
return processDeviceRpcResponseFromEdge(tenantId, deviceRpcCallMsg);
} else if (deviceRpcCallMsg.hasRequestMsg()) {
return processDeviceRpcRequestFromEdge(tenantId, edge, deviceRpcCallMsg);
}
return Futures.immediateFuture(null);
}
private ListenableFuture<Void> processDeviceRpcResponseFromEdge(TenantId tenantId, DeviceRpcCallMsg deviceRpcCallMsg) {
SettableFuture<Void> futureToSet = SettableFuture.create(); SettableFuture<Void> futureToSet = SettableFuture.create();
UUID requestUuid = new UUID(deviceRpcCallMsg.getRequestUuidMSB(), deviceRpcCallMsg.getRequestUuidLSB()); UUID requestUuid = new UUID(deviceRpcCallMsg.getRequestUuidMSB(), deviceRpcCallMsg.getRequestUuidLSB());
DeviceId deviceId = new DeviceId(new UUID(deviceRpcCallMsg.getDeviceIdMSB(), deviceRpcCallMsg.getDeviceIdLSB())); DeviceId deviceId = new DeviceId(new UUID(deviceRpcCallMsg.getDeviceIdMSB(), deviceRpcCallMsg.getDeviceIdLSB()));
@ -357,6 +367,46 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
return futureToSet; return futureToSet;
} }
private ListenableFuture<Void> processDeviceRpcRequestFromEdge(TenantId tenantId, Edge edge, DeviceRpcCallMsg deviceRpcCallMsg) {
DeviceId deviceId = new DeviceId(new UUID(deviceRpcCallMsg.getDeviceIdMSB(), deviceRpcCallMsg.getDeviceIdLSB()));
try {
TbMsgMetaData metaData = new TbMsgMetaData();
String requestId = Integer.toString(deviceRpcCallMsg.getRequestId());
metaData.putValue("requestId", requestId);
metaData.putValue("serviceId", deviceRpcCallMsg.getServiceId());
metaData.putValue("sessionId", deviceRpcCallMsg.getSessionId());
metaData.putValue(DataConstants.EDGE_ID, edge.getId().toString());
Device device = deviceService.findDeviceById(tenantId, deviceId);
if (device != null) {
metaData.putValue("deviceName", device.getName());
metaData.putValue("deviceType", device.getType());
metaData.putValue(DataConstants.DEVICE_ID, deviceId.getId().toString());
}
ObjectNode data = JacksonUtil.OBJECT_MAPPER.createObjectNode();
data.put("method", deviceRpcCallMsg.getRequestMsg().getMethod());
data.put("params", deviceRpcCallMsg.getRequestMsg().getParams());
TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, null, metaData,
TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(data));
tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.debug("Successfully send TO_SERVER_RPC_REQUEST to rule engine [{}], deviceRpcCallMsg {}",
device, deviceRpcCallMsg);
}
@Override
public void onFailure(Throwable t) {
log.debug("Failed to send TO_SERVER_RPC_REQUEST to rule engine [{}], deviceRpcCallMsg {}",
device, deviceRpcCallMsg, t);
}
});
} catch (JsonProcessingException | IllegalArgumentException e) {
log.warn("[{}] Failed to push TO_SERVER_RPC_REQUEST to rule engine. deviceRpcCallMsg {}", deviceId, deviceRpcCallMsg, e);
}
return Futures.immediateFuture(null);
}
public DownlinkMsg convertDeviceEventToDownlink(EdgeEvent edgeEvent) { public DownlinkMsg convertDeviceEventToDownlink(EdgeEvent edgeEvent) {
DeviceId deviceId = new DeviceId(edgeEvent.getEntityId()); DeviceId deviceId = new DeviceId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null; DownlinkMsg downlinkMsg = null;
@ -413,11 +463,9 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
private DownlinkMsg convertRpcCallEventToDownlink(EdgeEvent edgeEvent) { private DownlinkMsg convertRpcCallEventToDownlink(EdgeEvent edgeEvent) {
log.trace("Executing convertRpcCallEventToDownlink, edgeEvent [{}]", edgeEvent); log.trace("Executing convertRpcCallEventToDownlink, edgeEvent [{}]", edgeEvent);
DeviceRpcCallMsg deviceRpcCallMsg =
deviceMsgConstructor.constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody());
return DownlinkMsg.newBuilder() return DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt()) .setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addDeviceRpcCallMsg(deviceRpcCallMsg) .addDeviceRpcCallMsg(deviceMsgConstructor.constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody()))
.build(); .build();
} }

View File

@ -511,8 +511,11 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
body.put("expirationTime", System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10)); body.put("expirationTime", System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10));
body.put("method", "test_method"); body.put("method", "test_method");
body.put("params", "{\"param1\":\"value1\"}"); body.put("params", "{\"param1\":\"value1\"}");
body.put("persisted", true);
body.put("retries", 2);
EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.RPC_CALL, device.getId().getId(), EdgeEventType.DEVICE, body); EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.RPC_CALL,
device.getId().getId(), EdgeEventType.DEVICE, body);
edgeImitator.expectMessageAmount(1); edgeImitator.expectMessageAmount(1);
edgeEventService.saveAsync(edgeEvent).get(); edgeEventService.saveAsync(edgeEvent).get();
clusterService.onEdgeEventUpdate(tenantId, edge.getId()); clusterService.onEdgeEventUpdate(tenantId, edge.getId());
@ -522,6 +525,8 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
Assert.assertTrue(latestMessage instanceof DeviceRpcCallMsg); Assert.assertTrue(latestMessage instanceof DeviceRpcCallMsg);
DeviceRpcCallMsg latestDeviceRpcCallMsg = (DeviceRpcCallMsg) latestMessage; DeviceRpcCallMsg latestDeviceRpcCallMsg = (DeviceRpcCallMsg) latestMessage;
Assert.assertEquals("test_method", latestDeviceRpcCallMsg.getRequestMsg().getMethod()); Assert.assertEquals("test_method", latestDeviceRpcCallMsg.getRequestMsg().getMethod());
Assert.assertTrue(latestDeviceRpcCallMsg.getPersisted());
Assert.assertEquals(2, latestDeviceRpcCallMsg.getRetries());
} }
private void sendAttributesRequestAndVerify(Device device, String scope, String attributesDataStr, String expectedKey, private void sendAttributesRequestAndVerify(Device device, String scope, String attributesDataStr, String expectedKey,

View File

@ -40,6 +40,8 @@ public class DataConstants {
public static final String EXPIRATION_TIME = "expirationTime"; public static final String EXPIRATION_TIME = "expirationTime";
public static final String ADDITIONAL_INFO = "additionalInfo"; public static final String ADDITIONAL_INFO = "additionalInfo";
public static final String RETRIES = "retries"; public static final String RETRIES = "retries";
public static final String EDGE_ID = "edgeId";
public static final String DEVICE_ID = "deviceId";
public static final String COAP_TRANSPORT_NAME = "COAP"; public static final String COAP_TRANSPORT_NAME = "COAP";
public static final String LWM2M_TRANSPORT_NAME = "LWM2M"; public static final String LWM2M_TRANSPORT_NAME = "LWM2M";
public static final String MQTT_TRANSPORT_NAME = "MQTT"; public static final String MQTT_TRANSPORT_NAME = "MQTT";

View File

@ -430,6 +430,11 @@ message DeviceRpcCallMsg {
bool oneway = 7; bool oneway = 7;
RpcRequestMsg requestMsg = 8; RpcRequestMsg requestMsg = 8;
RpcResponseMsg responseMsg = 9; RpcResponseMsg responseMsg = 9;
optional bool persisted = 10;
optional int32 retries = 11;
optional string additionalInfo = 12;
optional string serviceId = 13;
optional string sessionId = 14;
} }
message RpcRequestMsg { message RpcRequestMsg {

View File

@ -731,6 +731,8 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
ConstraintViolationException e = extractConstraintViolationException(t).orElse(null); ConstraintViolationException e = extractConstraintViolationException(t).orElse(null);
if (e != null && e.getConstraintName() != null && e.getConstraintName().equalsIgnoreCase("fk_default_rule_chain_device_profile")) { if (e != null && e.getConstraintName() != null && e.getConstraintName().equalsIgnoreCase("fk_default_rule_chain_device_profile")) {
throw new DataValidationException("The rule chain referenced by the device profiles cannot be deleted!"); throw new DataValidationException("The rule chain referenced by the device profiles cannot be deleted!");
} else if (e != null && e.getConstraintName() != null && e.getConstraintName().equalsIgnoreCase("fk_default_rule_chain_asset_profile")) {
throw new DataValidationException("The rule chain referenced by the asset profiles cannot be deleted!");
} else { } else {
throw t; throw t;
} }

View File

@ -4,3 +4,4 @@ LOG_FOLDER=logs
LOGGER_FILENAME=tb-js-executor-%DATE%.log LOGGER_FILENAME=tb-js-executor-%DATE%.log
DOCKER_MODE=true DOCKER_MODE=true
SCRIPT_BODY_TRACE_FREQUENCY=1000 SCRIPT_BODY_TRACE_FREQUENCY=1000
NODE_OPTIONS="--max-old-space-size=200"

View File

@ -39,6 +39,7 @@ const TIMEOUT_ERROR = 2;
const NOT_FOUND_ERROR = 3; const NOT_FOUND_ERROR = 3;
const statFrequency = Number(config.get('script.stat_print_frequency')); const statFrequency = Number(config.get('script.stat_print_frequency'));
const memoryUsageTraceFrequency = Number(config.get('script.memory_usage_trace_frequency'));
const scriptBodyTraceFrequency = Number(config.get('script.script_body_trace_frequency')); const scriptBodyTraceFrequency = Number(config.get('script.script_body_trace_frequency'));
const useSandbox = config.get('script.use_sandbox') === 'true'; const useSandbox = config.get('script.use_sandbox') === 'true';
const maxActiveScripts = Number(config.get('script.max_active_scripts')); const maxActiveScripts = Number(config.get('script.max_active_scripts'));
@ -167,6 +168,10 @@ export class JsInvokeMessageProcessor {
if (this.executedScriptsCounter % scriptBodyTraceFrequency == 0) { if (this.executedScriptsCounter % scriptBodyTraceFrequency == 0) {
this.logger.info('[%s] Executing script body: [%s]', scriptId, invokeRequest.scriptBody); this.logger.info('[%s] Executing script body: [%s]', scriptId, invokeRequest.scriptBody);
} }
if (this.executedScriptsCounter % memoryUsageTraceFrequency == 0) {
this.logger.info('Current memory usage: [%s]', process.memoryUsage());
}
this.getOrCompileScript(scriptId, invokeRequest.scriptBody).then( this.getOrCompileScript(scriptId, invokeRequest.scriptBody).then(
(script) => { (script) => {
this.executor.executeScript(script, invokeRequest.args, invokeRequest.timeout).then( this.executor.executeScript(script, invokeRequest.args, invokeRequest.timeout).then(

View File

@ -75,6 +75,7 @@ logger:
script: script:
use_sandbox: "SCRIPT_USE_SANDBOX" use_sandbox: "SCRIPT_USE_SANDBOX"
memory_usage_trace_frequency: "MEMORY_USAGE_TRACE_FREQUENCY"
stat_print_frequency: "SCRIPT_STAT_PRINT_FREQUENCY" stat_print_frequency: "SCRIPT_STAT_PRINT_FREQUENCY"
script_body_trace_frequency: "SCRIPT_BODY_TRACE_FREQUENCY" script_body_trace_frequency: "SCRIPT_BODY_TRACE_FREQUENCY"
max_active_scripts: "MAX_ACTIVE_SCRIPTS" max_active_scripts: "MAX_ACTIVE_SCRIPTS"

View File

@ -64,6 +64,7 @@ logger:
script: script:
use_sandbox: "true" use_sandbox: "true"
memory_usage_trace_frequency: "1000"
script_body_trace_frequency: "10000" script_body_trace_frequency: "10000"
stat_print_frequency: "10000" stat_print_frequency: "10000"
max_active_scripts: "1000" max_active_scripts: "1000"

View File

@ -27,4 +27,4 @@ source "${CONF_FOLDER}/${configfile}"
cd ${pkg.installFolder} cd ${pkg.installFolder}
# This will forward this PID 1 to the node.js and forward SIGTERM for graceful shutdown as well # This will forward this PID 1 to the node.js and forward SIGTERM for graceful shutdown as well
exec node server.js exec node --no-compilation-cache server.js

View File

@ -77,7 +77,7 @@
<zookeeper.version>3.5.5</zookeeper.version> <zookeeper.version>3.5.5</zookeeper.version>
<protobuf.version>3.21.9</protobuf.version> <protobuf.version>3.21.9</protobuf.version>
<grpc.version>1.42.1</grpc.version> <grpc.version>1.42.1</grpc.version>
<mvel.version>2.4.23TB</mvel.version> <mvel.version>2.4.25TB</mvel.version>
<lombok.version>1.18.18</lombok.version> <lombok.version>1.18.18</lombok.version>
<paho.client.version>1.2.4</paho.client.version> <paho.client.version>1.2.4</paho.client.version>
<netty.version>4.1.75.Final</netty.version> <netty.version>4.1.75.Final</netty.version>

View File

@ -142,8 +142,11 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
actionType = EdgeEventActionType.ATTRIBUTES_UPDATED; actionType = EdgeEventActionType.ATTRIBUTES_UPDATED;
} else if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)) { } else if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)) {
actionType = EdgeEventActionType.POST_ATTRIBUTES; actionType = EdgeEventActionType.POST_ATTRIBUTES;
} else { } else if (DataConstants.ATTRIBUTES_DELETED.equals(msgType)) {
actionType = EdgeEventActionType.ATTRIBUTES_DELETED; actionType = EdgeEventActionType.ATTRIBUTES_DELETED;
} else {
log.warn("Unsupported msg type [{}]", msgType);
throw new IllegalArgumentException("Unsupported msg type: " + msgType);
} }
return actionType; return actionType;
} }

View File

@ -15,15 +15,27 @@
*/ */
package org.thingsboard.rule.engine.rpc; package org.thingsboard.rule.engine.rpc;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
@ -65,9 +77,46 @@ public class TbSendRPCReplyNode implements TbNode {
} else if (StringUtils.isEmpty(msg.getData())) { } else if (StringUtils.isEmpty(msg.getData())) {
ctx.tellFailure(msg, new RuntimeException("Request body is empty!")); ctx.tellFailure(msg, new RuntimeException("Request body is empty!"));
} else { } else {
ctx.getRpcService().sendRpcReplyToDevice(serviceIdStr, UUID.fromString(sessionIdStr), Integer.parseInt(requestIdStr), msg.getData()); if (StringUtils.isNotBlank(msg.getMetaData().getValue(DataConstants.EDGE_ID))) {
ctx.tellSuccess(msg); saveRpcResponseToEdgeQueue(ctx, msg, serviceIdStr, sessionIdStr, requestIdStr);
} else {
ctx.getRpcService().sendRpcReplyToDevice(serviceIdStr, UUID.fromString(sessionIdStr), Integer.parseInt(requestIdStr), msg.getData());
ctx.tellSuccess(msg);
}
} }
} }
private void saveRpcResponseToEdgeQueue(TbContext ctx, TbMsg msg, String serviceIdStr, String sessionIdStr, String requestIdStr) {
EdgeId edgeId;
DeviceId deviceId;
try {
edgeId = new EdgeId(UUID.fromString(msg.getMetaData().getValue(DataConstants.EDGE_ID)));
deviceId = new DeviceId(UUID.fromString(msg.getMetaData().getValue(DataConstants.DEVICE_ID)));
} catch (Exception e) {
String errMsg = String.format("[%s] Failed to parse edgeId or deviceId from metadata %s!", ctx.getTenantId(), msg.getMetaData());
ctx.tellFailure(msg, new RuntimeException(errMsg));
return;
}
ObjectNode body = JacksonUtil.OBJECT_MAPPER.createObjectNode();
body.put("serviceId", serviceIdStr);
body.put("sessionId", sessionIdStr);
body.put("requestId", requestIdStr);
body.put("response", msg.getData());
EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(ctx.getTenantId(), edgeId, EdgeEventType.DEVICE,
EdgeEventActionType.RPC_CALL, deviceId, JacksonUtil.OBJECT_MAPPER.valueToTree(body));
ListenableFuture<Void> future = ctx.getEdgeEventService().saveAsync(edgeEvent);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Void result) {
ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId);
ctx.tellSuccess(msg);
}
@Override
public void onFailure(Throwable t) {
ctx.tellFailure(msg, t);
}
}, ctx.getDbCallbackExecutor());
}
} }

View File

@ -0,0 +1,119 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.rpc;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.api.RuleEngineRpcService;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.dao.edge.EdgeEventService;
import java.util.UUID;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class TbSendRPCReplyNodeTest {
private static final String DUMMY_SERVICE_ID = "testServiceId";
private static final int DUMMY_REQUEST_ID = 0;
private static final UUID DUMMY_SESSION_ID = UUID.randomUUID();
private static final String DUMMY_DATA = "{\"key\":\"value\"}";
TbSendRPCReplyNode node;
private final TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
private final DeviceId deviceId = new DeviceId(UUID.randomUUID());
@Mock
private TbContext ctx;
@Mock
private RuleEngineRpcService rpcService;
@Mock
private EdgeEventService edgeEventService;
@Mock
private ListeningExecutor listeningExecutor;
@Before
public void setUp() throws TbNodeException {
node = new TbSendRPCReplyNode();
TbSendRpcReplyNodeConfiguration config = new TbSendRpcReplyNodeConfiguration().defaultConfiguration();
node.init(ctx, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
}
@Test
public void sendReplyToTransport() {
Mockito.when(ctx.getRpcService()).thenReturn(rpcService);
TbMsg msg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, getDefaultMetadata(),
TbMsgDataType.JSON, DUMMY_DATA, null, null);
node.onMsg(ctx, msg);
verify(rpcService).sendRpcReplyToDevice(DUMMY_SERVICE_ID, DUMMY_SESSION_ID, DUMMY_REQUEST_ID, DUMMY_DATA);
verify(edgeEventService, never()).saveAsync(any());
}
@Test
public void sendReplyToEdgeQueue() {
Mockito.when(ctx.getTenantId()).thenReturn(tenantId);
Mockito.when(ctx.getEdgeEventService()).thenReturn(edgeEventService);
Mockito.when(edgeEventService.saveAsync(any())).thenReturn(SettableFuture.create());
Mockito.when(ctx.getDbCallbackExecutor()).thenReturn(listeningExecutor);
TbMsgMetaData defaultMetadata = getDefaultMetadata();
defaultMetadata.putValue(DataConstants.EDGE_ID, UUID.randomUUID().toString());
defaultMetadata.putValue(DataConstants.DEVICE_ID, UUID.randomUUID().toString());
TbMsg msg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, defaultMetadata,
TbMsgDataType.JSON, DUMMY_DATA, null, null);
node.onMsg(ctx, msg);
verify(edgeEventService).saveAsync(any());
verify(rpcService, never()).sendRpcReplyToDevice(DUMMY_SERVICE_ID, DUMMY_SESSION_ID, DUMMY_REQUEST_ID, DUMMY_DATA);
}
private TbMsgMetaData getDefaultMetadata() {
TbSendRpcReplyNodeConfiguration config = new TbSendRpcReplyNodeConfiguration().defaultConfiguration();
TbMsgMetaData metadata = new TbMsgMetaData();
metadata.putValue(config.getServiceIdMetaDataAttribute(), DUMMY_SERVICE_ID);
metadata.putValue(config.getSessionIdMetaDataAttribute(), DUMMY_SESSION_ID.toString());
metadata.putValue(config.getRequestIdMetaDataAttribute(), Integer.toString(DUMMY_REQUEST_ID));
return metadata;
}
}