diff --git a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java index e25f45a03c..59cca779f5 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java @@ -34,7 +34,6 @@ import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetProfileId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; -import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; @@ -286,28 +285,16 @@ public class DefaultTbClusterServiceTest { public void testPushMsgToRuleEngineWithTenantIdIsNullUuidAndEntityIsTenantUseQueueFromMsgIsTrue() { TbQueueProducer> tbREQueueProducer = mock(TbQueueProducer.class); TbQueueCallback callback = mock(TbQueueCallback.class); - TopicPartitionInfo tpi = mock(TopicPartitionInfo.class); TenantId tenantId = TenantId.fromUUID(UUID.fromString("3c8bd350-1239-4a3b-b9c3-4dd76f8e20f1")); TbMsg requestMsg = TbMsg.newMsg(DataConstants.HP_QUEUE_NAME, TbMsgType.REST_API_REQUEST, tenantId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); - TransportProtos.ToRuleEngineMsg toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder() - .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) - .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(requestMsg)).build(); - when(partitionService.resolve(any(ServiceType.class), any(String.class), any(TenantId.class), any(EntityId.class))).thenReturn(tpi); when(producerProvider.getRuleEngineMsgProducer()).thenReturn(tbREQueueProducer); clusterService.pushMsgToRuleEngine(TenantId.SYS_TENANT_ID, tenantId, requestMsg, true, callback); - verify(partitionService).resolve(ServiceType.TB_RULE_ENGINE, DataConstants.HP_QUEUE_NAME, tenantId, tenantId); verify(producerProvider).getRuleEngineMsgProducer(); - ArgumentCaptor> protoQueueMsgArgumentCaptor = ArgumentCaptor.forClass(TbProtoQueueMsg.class); - verify(tbREQueueProducer).send(eq(tpi), protoQueueMsgArgumentCaptor.capture(), eq(callback)); - TbProtoQueueMsg protoQueueMsgArgumentCaptorValue = protoQueueMsgArgumentCaptor.getValue(); - assertThat(protoQueueMsgArgumentCaptorValue.getKey()).isEqualTo(requestMsg.getId()); - assertThat(protoQueueMsgArgumentCaptorValue.getValue()).isEqualTo(toRuleEngineMsg); - assertThat(protoQueueMsgArgumentCaptorValue.getHeaders().getData()).isEqualTo(new DefaultTbQueueMsgHeaders().getData()); + verify(ruleEngineProducerService).sendToRuleEngine(tbREQueueProducer, tenantId, requestMsg, callback); } @Test @@ -326,38 +313,25 @@ public class DefaultTbClusterServiceTest { public void testPushMsgToRuleEngineWithTenantIdIsNotNullUuidUseQueueFromMsgIsTrue() { TbQueueProducer> tbREQueueProducer = mock(TbQueueProducer.class); TbQueueCallback callback = mock(TbQueueCallback.class); - TopicPartitionInfo tpi = mock(TopicPartitionInfo.class); TenantId tenantId = TenantId.fromUUID(UUID.fromString("3c8bd350-1239-4a3b-b9c3-4dd76f8e20f1")); DeviceId deviceId = new DeviceId(UUID.fromString("adbb9d41-3367-40fd-9e74-7dd7cc5d30cf")); DeviceProfile deviceProfile = new DeviceProfile(new DeviceProfileId(UUID.fromString("552f5d6d-0b2b-43e1-a7d2-a51cb2a96927"))); TbMsg requestMsg = TbMsg.newMsg(DataConstants.HP_QUEUE_NAME, TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); - TransportProtos.ToRuleEngineMsg toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder() - .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) - .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(requestMsg)).build(); when(deviceProfileCache.get(any(TenantId.class), any(DeviceId.class))).thenReturn(deviceProfile); - when(partitionService.resolve(any(ServiceType.class), any(String.class), any(TenantId.class), any(EntityId.class))).thenReturn(tpi); when(producerProvider.getRuleEngineMsgProducer()).thenReturn(tbREQueueProducer); clusterService.pushMsgToRuleEngine(tenantId, deviceId, requestMsg, true, callback); - verify(partitionService).resolve(ServiceType.TB_RULE_ENGINE, DataConstants.HP_QUEUE_NAME, tenantId, deviceId); verify(producerProvider).getRuleEngineMsgProducer(); - ArgumentCaptor> protoQueueMsgArgumentCaptor = ArgumentCaptor.forClass(TbProtoQueueMsg.class); - verify(tbREQueueProducer).send(eq(tpi), protoQueueMsgArgumentCaptor.capture(), eq(callback)); - TbProtoQueueMsg protoQueueMsgArgumentCaptorValue = protoQueueMsgArgumentCaptor.getValue(); - assertThat(protoQueueMsgArgumentCaptorValue.getKey()).isEqualTo(requestMsg.getId()); - assertThat(protoQueueMsgArgumentCaptorValue.getValue()).isEqualTo(toRuleEngineMsg); - assertThat(protoQueueMsgArgumentCaptorValue.getHeaders().getData()).isEqualTo(new DefaultTbQueueMsgHeaders().getData()); + verify(ruleEngineProducerService).sendToRuleEngine(tbREQueueProducer, tenantId, requestMsg, callback); } @Test public void testPushMsgToRuleEngineUseQueueFromMsgIsFalse() { TbQueueProducer> tbREQueueProducer = mock(TbQueueProducer.class); TbQueueCallback callback = mock(TbQueueCallback.class); - TopicPartitionInfo tpi = mock(TopicPartitionInfo.class); TenantId tenantId = TenantId.fromUUID(UUID.fromString("5377c8d0-26e5-4d81-84c6-4344043973c8")); DeviceId deviceId = new DeviceId(UUID.fromString("016c2abb-f46f-49f9-a83d-4d28b803cfe6")); @@ -365,25 +339,16 @@ public class DefaultTbClusterServiceTest { deviceProfile.setDefaultQueueName(DataConstants.MAIN_QUEUE_NAME); TbMsg requestMsg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); - TransportProtos.ToRuleEngineMsg toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder() - .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) - .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(requestMsg)).build(); - when(deviceProfileCache.get(any(TenantId.class), any(DeviceId.class))).thenReturn(deviceProfile); - when(partitionService.resolve(any(ServiceType.class), any(String.class), any(TenantId.class), any(EntityId.class))).thenReturn(tpi); when(producerProvider.getRuleEngineMsgProducer()).thenReturn(tbREQueueProducer); clusterService.pushMsgToRuleEngine(tenantId, deviceId, requestMsg, false, callback); - verify(partitionService).resolve(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId, deviceId); verify(producerProvider).getRuleEngineMsgProducer(); - ArgumentCaptor> protoQueueMsgArgumentCaptor = ArgumentCaptor.forClass(TbProtoQueueMsg.class); - verify(tbREQueueProducer).send(eq(tpi), protoQueueMsgArgumentCaptor.capture(), eq(callback)); - TbProtoQueueMsg protoQueueMsgArgumentCaptorValue = protoQueueMsgArgumentCaptor.getValue(); - assertThat(protoQueueMsgArgumentCaptorValue.getKey()).isEqualTo(requestMsg.getId()); - assertThat(protoQueueMsgArgumentCaptorValue.getValue()).isEqualTo(toRuleEngineMsg); - assertThat(protoQueueMsgArgumentCaptorValue.getHeaders().getData()).isEqualTo(new DefaultTbQueueMsgHeaders().getData()); + TbMsg expectedMsg = TbMsg.transformMsgQueueName(requestMsg, DataConstants.MAIN_QUEUE_NAME); + ArgumentCaptor actualMsg = ArgumentCaptor.forClass(TbMsg.class); + verify(ruleEngineProducerService).sendToRuleEngine(eq(tbREQueueProducer), eq(tenantId), actualMsg.capture(), eq(callback)); + assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg); } protected Queue createTestQueue() {