diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 16e79b6eeb..40b32d3072 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -53,7 +53,6 @@ import java.util.function.Consumer; */ class DefaultTbContext implements TbContext { - private static final Function, ? extends Void> LIST_VOID_FUNCTION = v -> null; private final ActorSystemContext mainCtx; private final RuleNodeCtx nodeCtx; @@ -120,7 +119,7 @@ class DefaultTbContext implements TbContext { @Override public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) { - return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data); + return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L); } @Override diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index 847c49430e..fb7bc4902d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -169,12 +169,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor pushMsgToNode(firstNode, msg)); + putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg)); } void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) { checkActive(); - putToQueue(envelope.getTbMsg(), msg -> { + putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> { pushMsgToNode(firstNode, msg); envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self); }); @@ -185,7 +185,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor relations = nodeRoutes.get(originator).stream() - .filter(r -> targetRelationType == null || targetRelationType.equals(r.getType())) + .filter(r -> targetRelationType == null || targetRelationType.equalsIgnoreCase(r.getType())) .collect(Collectors.toList()); TbMsg msg = envelope.getMsg(); @@ -212,7 +212,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor extends Abstract } protected void putToQueue(final TbMsg tbMsg, final Consumer onSuccess) { + EntityId entityId = tbMsg.getRuleNodeId() != null ? tbMsg.getRuleNodeId() : tbMsg.getRuleChainId(); Futures.addCallback(queue.put(tbMsg, entityId.getId(), 0), new FutureCallback() { @Override public void onSuccess(@Nullable Void result) { diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java index af477640e6..b732e66f48 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java @@ -80,7 +80,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC } } - @Ignore @Test public void testServerMqttOneWayRpc() throws Exception { Device device = new Device(); @@ -107,7 +106,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC Assert.assertTrue(StringUtils.isEmpty(result)); } - @Ignore @Test public void testServerMqttOneWayRpcDeviceOffline() throws Exception { Device device = new Device(); diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java index 7c9c0586f0..d2287f04a9 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java @@ -15,7 +15,7 @@ */ package org.thingsboard.server.mqtt.rpc.sql; -import org.thingsboard.server.dao.service.DaoNoSqlTest; +import org.junit.Ignore; import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.mqtt.rpc.AbstractMqttServerSideRpcIntegrationTest; diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java index f45e30335f..2d56772df3 100644 --- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java @@ -150,7 +150,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule "CUSTOM", device.getId(), new TbMsgMetaData(), - "{}"); + "{}", null, null, 0L); actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)); Thread.sleep(3000); diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java index 29e8b73561..18583fdc7e 100644 --- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java @@ -138,7 +138,8 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac "CUSTOM", device.getId(), new TbMsgMetaData(), - "{}"); + "{}", + null, null, 0L); actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)); Thread.sleep(3000); diff --git a/application/src/test/java/org/thingsboard/server/service/script/NashornJsEngineTest.java b/application/src/test/java/org/thingsboard/server/service/script/NashornJsEngineTest.java index e6a48e2838..1981287f6d 100644 --- a/application/src/test/java/org/thingsboard/server/service/script/NashornJsEngineTest.java +++ b/application/src/test/java/org/thingsboard/server/service/script/NashornJsEngineTest.java @@ -42,7 +42,7 @@ public class NashornJsEngineTest { metaData.putValue("humidity", "99"); String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L); TbMsg actual = scriptEngine.executeUpdate(msg); assertEquals("70", actual.getMetaData().getValue("temp")); @@ -57,7 +57,7 @@ public class NashornJsEngineTest { metaData.putValue("humidity", "99"); String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L); TbMsg actual = scriptEngine.executeUpdate(msg); assertEquals("94", actual.getMetaData().getValue("newAttr")); @@ -72,7 +72,7 @@ public class NashornJsEngineTest { metaData.putValue("humidity", "99"); String rawJson = "{\"name\":\"Vit\",\"passed\": 5,\"bigObj\":{\"prop\":42}}"; - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L); TbMsg actual = scriptEngine.executeUpdate(msg); @@ -89,7 +89,7 @@ public class NashornJsEngineTest { metaData.putValue("humidity", "99"); String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L); assertFalse(scriptEngine.executeFilter(msg)); } @@ -102,7 +102,7 @@ public class NashornJsEngineTest { metaData.putValue("humidity", "99"); String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L); assertTrue(scriptEngine.executeFilter(msg)); } @@ -122,7 +122,7 @@ public class NashornJsEngineTest { metaData.putValue("humidity", "99"); String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L); Set actual = scriptEngine.executeSwitch(msg); assertEquals(Sets.newHashSet("one"), actual); } @@ -143,7 +143,7 @@ public class NashornJsEngineTest { metaData.putValue("humidity", "99"); String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L); Set actual = scriptEngine.executeSwitch(msg); assertEquals(Sets.newHashSet("one", "three"), actual); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/queue/DummySqlMsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/DummySqlMsgQueue.java new file mode 100644 index 0000000000..0281d409d2 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/DummySqlMsgQueue.java @@ -0,0 +1,50 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sql.queue; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.dao.queue.MsgQueue; +import org.thingsboard.server.dao.util.SqlDao; + +import java.util.Collections; +import java.util.UUID; + +/** + * Created by ashvayka on 27.04.18. + */ +@Component +@Slf4j +@SqlDao +public class DummySqlMsgQueue implements MsgQueue { + @Override + public ListenableFuture put(TbMsg msg, UUID nodeId, long clusterPartition) { + return Futures.immediateFuture(null); + } + + @Override + public ListenableFuture ack(TbMsg msg, UUID nodeId, long clusterPartition) { + return Futures.immediateFuture(null); + } + + @Override + public Iterable findUnprocessed(UUID nodeId, long clusterPartition) { + return Collections.emptyList(); + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java index 3c6704b013..4ee8bd5461 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java @@ -61,7 +61,7 @@ public class TbJsSwitchNode implements TbNode { ctx.tellNext(msg, nextRelations); } - @Override + @Override public void destroy() { if (jsEngine != null) { jsEngine.destroy(); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java index 28713038a3..8213195711 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java @@ -118,17 +118,21 @@ public class TbAlarmNodeTest { node.onMsg(ctx, msg); - ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); - verify(ctx).tellNext(captor.capture(), eq("Created")); - TbMsg actualMsg = captor.getValue(); + verify(ctx).tellNext(any(), eq("Created")); - assertEquals("ALARM", actualMsg.getType()); - assertEquals(originator, actualMsg.getOriginator()); - assertEquals("value", actualMsg.getMetaData().getValue("key")); - assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_NEW_ALARM)); - assertNotSame(metaData, actualMsg.getMetaData()); + ArgumentCaptor typeCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor originatorCaptor = ArgumentCaptor.forClass(EntityId.class); + ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class); + ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(String.class); + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture()); - Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class); + assertEquals("ALARM", typeCaptor.getValue()); + assertEquals(originator, originatorCaptor.getValue()); + assertEquals("value", metadataCaptor.getValue().getValue("key")); + assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_NEW_ALARM)); + assertNotSame(metaData, metadataCaptor.getValue()); + + Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class); Alarm expectedAlarm = Alarm.builder() .tenantId(tenantId) .originator(originator) @@ -208,17 +212,22 @@ public class TbAlarmNodeTest { node.onMsg(ctx, msg); - ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); - verify(ctx).tellNext(captor.capture(), eq("Created")); - TbMsg actualMsg = captor.getValue(); + verify(ctx).tellNext(any(), eq("Created")); - assertEquals("ALARM", actualMsg.getType()); - assertEquals(originator, actualMsg.getOriginator()); - assertEquals("value", actualMsg.getMetaData().getValue("key")); - assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_NEW_ALARM)); - assertNotSame(metaData, actualMsg.getMetaData()); + ArgumentCaptor typeCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor originatorCaptor = ArgumentCaptor.forClass(EntityId.class); + ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class); + ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(String.class); + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture()); - Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class); + assertEquals("ALARM", typeCaptor.getValue()); + assertEquals(originator, originatorCaptor.getValue()); + assertEquals("value", metadataCaptor.getValue().getValue("key")); + assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_NEW_ALARM)); + assertNotSame(metaData, metadataCaptor.getValue()); + + + Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class); Alarm expectedAlarm = Alarm.builder() .tenantId(tenantId) .originator(originator) @@ -252,17 +261,21 @@ public class TbAlarmNodeTest { node.onMsg(ctx, msg); - ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); - verify(ctx).tellNext(captor.capture(), eq("Updated")); - TbMsg actualMsg = captor.getValue(); + verify(ctx).tellNext(any(), eq("Updated")); - assertEquals("ALARM", actualMsg.getType()); - assertEquals(originator, actualMsg.getOriginator()); - assertEquals("value", actualMsg.getMetaData().getValue("key")); - assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_EXISTING_ALARM)); - assertNotSame(metaData, actualMsg.getMetaData()); + ArgumentCaptor typeCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor originatorCaptor = ArgumentCaptor.forClass(EntityId.class); + ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class); + ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(String.class); + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture()); - Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class); + assertEquals("ALARM", typeCaptor.getValue()); + assertEquals(originator, originatorCaptor.getValue()); + assertEquals("value", metadataCaptor.getValue().getValue("key")); + assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_EXISTING_ALARM)); + assertNotSame(metaData, metadataCaptor.getValue()); + + Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class); assertTrue(activeAlarm.getEndTs() > oldEndDate); Alarm expectedAlarm = Alarm.builder() .tenantId(tenantId) @@ -298,17 +311,21 @@ public class TbAlarmNodeTest { node.onMsg(ctx, msg); - ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); - verify(ctx).tellNext(captor.capture(), eq("Cleared")); - TbMsg actualMsg = captor.getValue(); + verify(ctx).tellNext(any(), eq("Cleared")); - assertEquals("ALARM", actualMsg.getType()); - assertEquals(originator, actualMsg.getOriginator()); - assertEquals("value", actualMsg.getMetaData().getValue("key")); - assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_CLEARED_ALARM)); - assertNotSame(metaData, actualMsg.getMetaData()); + ArgumentCaptor typeCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor originatorCaptor = ArgumentCaptor.forClass(EntityId.class); + ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class); + ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(String.class); + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture()); - Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class); + assertEquals("ALARM", typeCaptor.getValue()); + assertEquals(originator, originatorCaptor.getValue()); + assertEquals("value", metadataCaptor.getValue().getValue("key")); + assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_CLEARED_ALARM)); + assertNotSame(metaData, metadataCaptor.getValue()); + + Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class); Alarm expectedAlarm = Alarm.builder() .tenantId(tenantId) .originator(originator) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java index 2e5d947d18..fe19a6d245 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java @@ -36,7 +36,9 @@ import java.io.IOException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class TbMsgToEmailNodeTest { @@ -62,17 +64,19 @@ public class TbMsgToEmailNodeTest { emailNode.onMsg(ctx, msg); - ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); - verify(ctx).tellNext(captor.capture()); - TbMsg actualMsg = captor.getValue(); - - assertEquals("SEND_EMAIL", actualMsg.getType()); - assertEquals(originator, actualMsg.getOriginator()); - assertEquals("oreo", actualMsg.getMetaData().getValue("username")); - assertNotSame(metaData, actualMsg.getMetaData()); + ArgumentCaptor typeCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor originatorCaptor = ArgumentCaptor.forClass(EntityId.class); + ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class); + ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(String.class); + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture()); - EmailPojo actual = new ObjectMapper().readValue(actualMsg.getData().getBytes(), EmailPojo.class); + assertEquals("SEND_EMAIL", typeCaptor.getValue()); + assertEquals(originator, originatorCaptor.getValue()); + assertEquals("oreo", metadataCaptor.getValue().getValue("username")); + assertNotSame(metaData, metadataCaptor.getValue()); + + EmailPojo actual = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), EmailPojo.class); EmailPojo expected = new EmailPojo.EmailPojoBuilder() .from("test@mail.org") diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java index 7f864ba109..1f75ed364c 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java @@ -29,6 +29,7 @@ import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.msg.TbMsg; @@ -68,11 +69,14 @@ public class TbChangeOriginatorNodeTest { when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset)); node.onMsg(ctx, msg); - ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); - verify(ctx).tellNext(captor.capture()); - TbMsg actualMsg = captor.getValue(); - assertEquals(customerId, actualMsg.getOriginator()); - assertEquals(msg.getId(), actualMsg.getId()); + + ArgumentCaptor typeCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor originatorCaptor = ArgumentCaptor.forClass(EntityId.class); + ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class); + ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(String.class); + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture()); + + assertEquals(customerId, originatorCaptor.getValue()); } @Test @@ -92,11 +96,13 @@ public class TbChangeOriginatorNodeTest { when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset)); node.onMsg(ctx, msg); - ArgumentCaptor captor = ArgumentCaptor.forClass(TbMsg.class); - verify(ctx).spawn(captor.capture()); - TbMsg actualMsg = captor.getValue(); - assertEquals(customerId, actualMsg.getOriginator()); - assertEquals(msg.getId(), actualMsg.getId()); + ArgumentCaptor typeCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor originatorCaptor = ArgumentCaptor.forClass(EntityId.class); + ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class); + ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(String.class); + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture()); + + assertEquals(customerId, originatorCaptor.getValue()); } @Test