diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 2ffe2002f9..8a365b4e56 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -823,6 +823,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { body.put("expirationTime", msg.getExpirationTime()); body.put("method", msg.getBody().getMethod()); body.put("params", msg.getBody().getParams()); + body.put("persisted", msg.isPersisted()); + body.put("retries", msg.getRetries()); + body.put("additionalInfo", msg.getAdditionalInfo()); EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.DEVICE, EdgeEventActionType.RPC_CALL, deviceId, body); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 2342997c65..957c699e32 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -611,7 +611,7 @@ public final class EdgeGrpcSession implements Closeable { } if (uplinkMsg.getDeviceRpcCallMsgCount() > 0) { for (DeviceRpcCallMsg deviceRpcCallMsg : uplinkMsg.getDeviceRpcCallMsgList()) { - result.add(ctx.getDeviceProcessor().processDeviceRpcCallResponseFromEdge(edge.getTenantId(), deviceRpcCallMsg)); + result.add(ctx.getDeviceProcessor().processDeviceRpcCallFromEdge(edge.getTenantId(), edge, deviceRpcCallMsg)); } } if (uplinkMsg.getWidgetBundleTypesRequestMsgCount() > 0) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java index 511910dd8a..522bcd3a4e 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java @@ -21,13 +21,13 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.v1.DeviceRpcCallMsg; import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg; import org.thingsboard.server.gen.edge.v1.RpcRequestMsg; +import org.thingsboard.server.gen.edge.v1.RpcResponseMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbCoreComponent; @@ -97,25 +97,55 @@ public class DeviceMsgConstructor { } public DeviceRpcCallMsg constructDeviceRpcCallMsg(UUID deviceId, JsonNode body) { - int requestId = body.get("requestId").asInt(); - boolean oneway = body.get("oneway").asBoolean(); - UUID requestUUID = UUID.fromString(body.get("requestUUID").asText()); - long expirationTime = body.get("expirationTime").asLong(); - String method = body.get("method").asText(); - String params = body.get("params").asText(); + DeviceRpcCallMsg.Builder builder = constructDeviceRpcMsg(deviceId, body); + if (body.has("error") || body.has("response")) { + RpcResponseMsg.Builder responseBuilder = RpcResponseMsg.newBuilder(); + if (body.has("error")) { + responseBuilder.setError(body.get("error").asText()); + } else { + responseBuilder.setResponse(body.get("response").asText()); + } + builder.setResponseMsg(responseBuilder.build()); + } else { + RpcRequestMsg.Builder requestBuilder = RpcRequestMsg.newBuilder(); + requestBuilder.setMethod(body.get("method").asText()); + requestBuilder.setParams(body.get("params").asText()); + builder.setRequestMsg(requestBuilder.build()); + } + return builder.build(); + } - RpcRequestMsg.Builder requestBuilder = RpcRequestMsg.newBuilder(); - requestBuilder.setMethod(method); - requestBuilder.setParams(params); + private DeviceRpcCallMsg.Builder constructDeviceRpcMsg(UUID deviceId, JsonNode body) { DeviceRpcCallMsg.Builder builder = DeviceRpcCallMsg.newBuilder() .setDeviceIdMSB(deviceId.getMostSignificantBits()) .setDeviceIdLSB(deviceId.getLeastSignificantBits()) - .setRequestUuidMSB(requestUUID.getMostSignificantBits()) - .setRequestUuidLSB(requestUUID.getLeastSignificantBits()) - .setRequestId(requestId) - .setExpirationTime(expirationTime) - .setOneway(oneway) - .setRequestMsg(requestBuilder.build()); - return builder.build(); + .setRequestId(body.get("requestId").asInt()); + if (body.get("oneway") != null) { + builder.setOneway(body.get("oneway").asBoolean()); + } + if (body.get("requestUUID") != null) { + UUID requestUUID = UUID.fromString(body.get("requestUUID").asText()); + builder.setRequestUuidMSB(requestUUID.getMostSignificantBits()) + .setRequestUuidLSB(requestUUID.getLeastSignificantBits()); + } + if (body.get("expirationTime") != null) { + builder.setExpirationTime(body.get("expirationTime").asLong()); + } + if (body.get("persisted") != null) { + builder.setPersisted(body.get("persisted").asBoolean()); + } + if (body.get("retries") != null) { + builder.setRetries(body.get("retries").asInt()); + } + if (body.get("additionalInfo") != null) { + builder.setAdditionalInfo(JacksonUtil.toString(body.get("additionalInfo"))); + } + if (body.get("serviceId") != null) { + builder.setServiceId(body.get("serviceId").asText()); + } + if (body.get("sessionId") != null) { + builder.setSessionId(body.get("sessionId").asText()); + } + return builder; } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java index e09e036953..81e7970611 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java @@ -52,6 +52,7 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.gen.edge.v1.DeviceCredentialsRequestMsg; import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; @@ -325,8 +326,17 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { return metaData; } - public ListenableFuture processDeviceRpcCallResponseFromEdge(TenantId tenantId, DeviceRpcCallMsg deviceRpcCallMsg) { - log.trace("[{}] processDeviceRpcCallResponseMsg [{}]", tenantId, deviceRpcCallMsg); + public ListenableFuture processDeviceRpcCallFromEdge(TenantId tenantId, Edge edge, DeviceRpcCallMsg deviceRpcCallMsg) { + log.trace("[{}] processDeviceRpcCallFromEdge [{}]", tenantId, deviceRpcCallMsg); + if (deviceRpcCallMsg.hasResponseMsg()) { + return processDeviceRpcResponseFromEdge(tenantId, deviceRpcCallMsg); + } else if (deviceRpcCallMsg.hasRequestMsg()) { + return processDeviceRpcRequestFromEdge(tenantId, edge, deviceRpcCallMsg); + } + return Futures.immediateFuture(null); + } + + private ListenableFuture processDeviceRpcResponseFromEdge(TenantId tenantId, DeviceRpcCallMsg deviceRpcCallMsg) { SettableFuture futureToSet = SettableFuture.create(); UUID requestUuid = new UUID(deviceRpcCallMsg.getRequestUuidMSB(), deviceRpcCallMsg.getRequestUuidLSB()); DeviceId deviceId = new DeviceId(new UUID(deviceRpcCallMsg.getDeviceIdMSB(), deviceRpcCallMsg.getDeviceIdLSB())); @@ -357,6 +367,46 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { return futureToSet; } + private ListenableFuture processDeviceRpcRequestFromEdge(TenantId tenantId, Edge edge, DeviceRpcCallMsg deviceRpcCallMsg) { + DeviceId deviceId = new DeviceId(new UUID(deviceRpcCallMsg.getDeviceIdMSB(), deviceRpcCallMsg.getDeviceIdLSB())); + try { + TbMsgMetaData metaData = new TbMsgMetaData(); + String requestId = Integer.toString(deviceRpcCallMsg.getRequestId()); + metaData.putValue("requestId", requestId); + metaData.putValue("serviceId", deviceRpcCallMsg.getServiceId()); + metaData.putValue("sessionId", deviceRpcCallMsg.getSessionId()); + metaData.putValue(DataConstants.EDGE_ID, edge.getId().toString()); + Device device = deviceService.findDeviceById(tenantId, deviceId); + if (device != null) { + metaData.putValue("deviceName", device.getName()); + metaData.putValue("deviceType", device.getType()); + metaData.putValue(DataConstants.DEVICE_ID, deviceId.getId().toString()); + } + ObjectNode data = JacksonUtil.OBJECT_MAPPER.createObjectNode(); + data.put("method", deviceRpcCallMsg.getRequestMsg().getMethod()); + data.put("params", deviceRpcCallMsg.getRequestMsg().getParams()); + TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, null, metaData, + TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(data)); + tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + log.debug("Successfully send TO_SERVER_RPC_REQUEST to rule engine [{}], deviceRpcCallMsg {}", + device, deviceRpcCallMsg); + } + + @Override + public void onFailure(Throwable t) { + log.debug("Failed to send TO_SERVER_RPC_REQUEST to rule engine [{}], deviceRpcCallMsg {}", + device, deviceRpcCallMsg, t); + } + }); + } catch (JsonProcessingException | IllegalArgumentException e) { + log.warn("[{}] Failed to push TO_SERVER_RPC_REQUEST to rule engine. deviceRpcCallMsg {}", deviceId, deviceRpcCallMsg, e); + } + + return Futures.immediateFuture(null); + } + public DownlinkMsg convertDeviceEventToDownlink(EdgeEvent edgeEvent) { DeviceId deviceId = new DeviceId(edgeEvent.getEntityId()); DownlinkMsg downlinkMsg = null; @@ -413,11 +463,9 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { private DownlinkMsg convertRpcCallEventToDownlink(EdgeEvent edgeEvent) { log.trace("Executing convertRpcCallEventToDownlink, edgeEvent [{}]", edgeEvent); - DeviceRpcCallMsg deviceRpcCallMsg = - deviceMsgConstructor.constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody()); return DownlinkMsg.newBuilder() .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) - .addDeviceRpcCallMsg(deviceRpcCallMsg) + .addDeviceRpcCallMsg(deviceMsgConstructor.constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody())) .build(); } diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java index 392a6814d4..ffe21ae961 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java @@ -511,8 +511,11 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest { body.put("expirationTime", System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10)); body.put("method", "test_method"); body.put("params", "{\"param1\":\"value1\"}"); + body.put("persisted", true); + body.put("retries", 2); - EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.RPC_CALL, device.getId().getId(), EdgeEventType.DEVICE, body); + EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.RPC_CALL, + device.getId().getId(), EdgeEventType.DEVICE, body); edgeImitator.expectMessageAmount(1); edgeEventService.saveAsync(edgeEvent).get(); clusterService.onEdgeEventUpdate(tenantId, edge.getId()); @@ -522,6 +525,8 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest { Assert.assertTrue(latestMessage instanceof DeviceRpcCallMsg); DeviceRpcCallMsg latestDeviceRpcCallMsg = (DeviceRpcCallMsg) latestMessage; Assert.assertEquals("test_method", latestDeviceRpcCallMsg.getRequestMsg().getMethod()); + Assert.assertTrue(latestDeviceRpcCallMsg.getPersisted()); + Assert.assertEquals(2, latestDeviceRpcCallMsg.getRetries()); } private void sendAttributesRequestAndVerify(Device device, String scope, String attributesDataStr, String expectedKey, diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 85b9e681a1..69c36697d3 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -40,6 +40,8 @@ public class DataConstants { public static final String EXPIRATION_TIME = "expirationTime"; public static final String ADDITIONAL_INFO = "additionalInfo"; public static final String RETRIES = "retries"; + public static final String EDGE_ID = "edgeId"; + public static final String DEVICE_ID = "deviceId"; public static final String COAP_TRANSPORT_NAME = "COAP"; public static final String LWM2M_TRANSPORT_NAME = "LWM2M"; public static final String MQTT_TRANSPORT_NAME = "MQTT"; diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index a2e3b5989b..249f251238 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -430,6 +430,11 @@ message DeviceRpcCallMsg { bool oneway = 7; RpcRequestMsg requestMsg = 8; RpcResponseMsg responseMsg = 9; + optional bool persisted = 10; + optional int32 retries = 11; + optional string additionalInfo = 12; + optional string serviceId = 13; + optional string sessionId = 14; } message RpcRequestMsg { diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java index 420739e37c..1d11d34b2d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java @@ -731,6 +731,8 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC ConstraintViolationException e = extractConstraintViolationException(t).orElse(null); if (e != null && e.getConstraintName() != null && e.getConstraintName().equalsIgnoreCase("fk_default_rule_chain_device_profile")) { throw new DataValidationException("The rule chain referenced by the device profiles cannot be deleted!"); + } else if (e != null && e.getConstraintName() != null && e.getConstraintName().equalsIgnoreCase("fk_default_rule_chain_asset_profile")) { + throw new DataValidationException("The rule chain referenced by the asset profiles cannot be deleted!"); } else { throw t; } diff --git a/docker/tb-js-executor.env b/docker/tb-js-executor.env index e080906549..1938449d53 100644 --- a/docker/tb-js-executor.env +++ b/docker/tb-js-executor.env @@ -3,4 +3,5 @@ LOGGER_LEVEL=info LOG_FOLDER=logs LOGGER_FILENAME=tb-js-executor-%DATE%.log DOCKER_MODE=true -SCRIPT_BODY_TRACE_FREQUENCY=1000 \ No newline at end of file +SCRIPT_BODY_TRACE_FREQUENCY=1000 +NODE_OPTIONS="--max-old-space-size=200" diff --git a/msa/js-executor/api/jsInvokeMessageProcessor.ts b/msa/js-executor/api/jsInvokeMessageProcessor.ts index 668cd61f50..e69209fa01 100644 --- a/msa/js-executor/api/jsInvokeMessageProcessor.ts +++ b/msa/js-executor/api/jsInvokeMessageProcessor.ts @@ -39,6 +39,7 @@ const TIMEOUT_ERROR = 2; const NOT_FOUND_ERROR = 3; const statFrequency = Number(config.get('script.stat_print_frequency')); +const memoryUsageTraceFrequency = Number(config.get('script.memory_usage_trace_frequency')); const scriptBodyTraceFrequency = Number(config.get('script.script_body_trace_frequency')); const useSandbox = config.get('script.use_sandbox') === 'true'; const maxActiveScripts = Number(config.get('script.max_active_scripts')); @@ -167,6 +168,10 @@ export class JsInvokeMessageProcessor { if (this.executedScriptsCounter % scriptBodyTraceFrequency == 0) { this.logger.info('[%s] Executing script body: [%s]', scriptId, invokeRequest.scriptBody); } + if (this.executedScriptsCounter % memoryUsageTraceFrequency == 0) { + this.logger.info('Current memory usage: [%s]', process.memoryUsage()); + } + this.getOrCompileScript(scriptId, invokeRequest.scriptBody).then( (script) => { this.executor.executeScript(script, invokeRequest.args, invokeRequest.timeout).then( diff --git a/msa/js-executor/config/custom-environment-variables.yml b/msa/js-executor/config/custom-environment-variables.yml index b9c24c8d8d..2ebea4ccc1 100644 --- a/msa/js-executor/config/custom-environment-variables.yml +++ b/msa/js-executor/config/custom-environment-variables.yml @@ -75,6 +75,7 @@ logger: script: use_sandbox: "SCRIPT_USE_SANDBOX" + memory_usage_trace_frequency: "MEMORY_USAGE_TRACE_FREQUENCY" stat_print_frequency: "SCRIPT_STAT_PRINT_FREQUENCY" script_body_trace_frequency: "SCRIPT_BODY_TRACE_FREQUENCY" max_active_scripts: "MAX_ACTIVE_SCRIPTS" diff --git a/msa/js-executor/config/default.yml b/msa/js-executor/config/default.yml index 64829ef792..96f3401da5 100644 --- a/msa/js-executor/config/default.yml +++ b/msa/js-executor/config/default.yml @@ -64,6 +64,7 @@ logger: script: use_sandbox: "true" + memory_usage_trace_frequency: "1000" script_body_trace_frequency: "10000" stat_print_frequency: "10000" max_active_scripts: "1000" diff --git a/msa/js-executor/docker/start-js-executor.sh b/msa/js-executor/docker/start-js-executor.sh index 575f93c389..d30b62c145 100755 --- a/msa/js-executor/docker/start-js-executor.sh +++ b/msa/js-executor/docker/start-js-executor.sh @@ -27,4 +27,4 @@ source "${CONF_FOLDER}/${configfile}" cd ${pkg.installFolder} # This will forward this PID 1 to the node.js and forward SIGTERM for graceful shutdown as well -exec node server.js +exec node --no-compilation-cache server.js diff --git a/pom.xml b/pom.xml index 60e8783bca..5f5aa3703b 100755 --- a/pom.xml +++ b/pom.xml @@ -77,7 +77,7 @@ 3.5.5 3.21.9 1.42.1 - 2.4.23TB + 2.4.25TB 1.18.18 1.2.4 4.1.75.Final diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java index 28dc64c068..f120276df3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java @@ -142,8 +142,11 @@ public abstract class AbstractTbMsgPushNode future = ctx.getEdgeEventService().saveAsync(edgeEvent); + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(Void result) { + ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId); + ctx.tellSuccess(msg); + } + + @Override + public void onFailure(Throwable t) { + ctx.tellFailure(msg, t); + } + }, ctx.getDbCallbackExecutor()); + } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNodeTest.java new file mode 100644 index 0000000000..095505aff3 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rpc/TbSendRPCReplyNodeTest.java @@ -0,0 +1,119 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.rpc; + +import com.google.common.util.concurrent.SettableFuture; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.ListeningExecutor; +import org.thingsboard.rule.engine.api.RuleEngineRpcService; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgDataType; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.session.SessionMsgType; +import org.thingsboard.server.dao.edge.EdgeEventService; + +import java.util.UUID; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +@RunWith(MockitoJUnitRunner.class) +public class TbSendRPCReplyNodeTest { + + private static final String DUMMY_SERVICE_ID = "testServiceId"; + private static final int DUMMY_REQUEST_ID = 0; + private static final UUID DUMMY_SESSION_ID = UUID.randomUUID(); + private static final String DUMMY_DATA = "{\"key\":\"value\"}"; + + TbSendRPCReplyNode node; + + private final TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); + private final DeviceId deviceId = new DeviceId(UUID.randomUUID()); + + @Mock + private TbContext ctx; + + @Mock + private RuleEngineRpcService rpcService; + + @Mock + private EdgeEventService edgeEventService; + + @Mock + private ListeningExecutor listeningExecutor; + + @Before + public void setUp() throws TbNodeException { + node = new TbSendRPCReplyNode(); + TbSendRpcReplyNodeConfiguration config = new TbSendRpcReplyNodeConfiguration().defaultConfiguration(); + node.init(ctx, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + } + + @Test + public void sendReplyToTransport() { + Mockito.when(ctx.getRpcService()).thenReturn(rpcService); + + + TbMsg msg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, getDefaultMetadata(), + TbMsgDataType.JSON, DUMMY_DATA, null, null); + + node.onMsg(ctx, msg); + + verify(rpcService).sendRpcReplyToDevice(DUMMY_SERVICE_ID, DUMMY_SESSION_ID, DUMMY_REQUEST_ID, DUMMY_DATA); + verify(edgeEventService, never()).saveAsync(any()); + } + + @Test + public void sendReplyToEdgeQueue() { + Mockito.when(ctx.getTenantId()).thenReturn(tenantId); + Mockito.when(ctx.getEdgeEventService()).thenReturn(edgeEventService); + Mockito.when(edgeEventService.saveAsync(any())).thenReturn(SettableFuture.create()); + Mockito.when(ctx.getDbCallbackExecutor()).thenReturn(listeningExecutor); + + TbMsgMetaData defaultMetadata = getDefaultMetadata(); + defaultMetadata.putValue(DataConstants.EDGE_ID, UUID.randomUUID().toString()); + defaultMetadata.putValue(DataConstants.DEVICE_ID, UUID.randomUUID().toString()); + TbMsg msg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, defaultMetadata, + TbMsgDataType.JSON, DUMMY_DATA, null, null); + + node.onMsg(ctx, msg); + + verify(edgeEventService).saveAsync(any()); + verify(rpcService, never()).sendRpcReplyToDevice(DUMMY_SERVICE_ID, DUMMY_SESSION_ID, DUMMY_REQUEST_ID, DUMMY_DATA); + } + + private TbMsgMetaData getDefaultMetadata() { + TbSendRpcReplyNodeConfiguration config = new TbSendRpcReplyNodeConfiguration().defaultConfiguration(); + TbMsgMetaData metadata = new TbMsgMetaData(); + metadata.putValue(config.getServiceIdMetaDataAttribute(), DUMMY_SERVICE_ID); + metadata.putValue(config.getSessionIdMetaDataAttribute(), DUMMY_SESSION_ID.toString()); + metadata.putValue(config.getRequestIdMetaDataAttribute(), Integer.toString(DUMMY_REQUEST_ID)); + return metadata; + } +}