fixed tests

This commit is contained in:
irynamatveieva 2024-07-31 16:08:55 +03:00
parent 649a8b65fd
commit 37e1a40b9f

View File

@ -34,7 +34,6 @@ import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.AssetProfileId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.QueueId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
@ -286,28 +285,16 @@ public class DefaultTbClusterServiceTest {
public void testPushMsgToRuleEngineWithTenantIdIsNullUuidAndEntityIsTenantUseQueueFromMsgIsTrue() {
TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> tbREQueueProducer = mock(TbQueueProducer.class);
TbQueueCallback callback = mock(TbQueueCallback.class);
TopicPartitionInfo tpi = mock(TopicPartitionInfo.class);
TenantId tenantId = TenantId.fromUUID(UUID.fromString("3c8bd350-1239-4a3b-b9c3-4dd76f8e20f1"));
TbMsg requestMsg = TbMsg.newMsg(DataConstants.HP_QUEUE_NAME, TbMsgType.REST_API_REQUEST, tenantId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
TransportProtos.ToRuleEngineMsg toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(requestMsg)).build();
when(partitionService.resolve(any(ServiceType.class), any(String.class), any(TenantId.class), any(EntityId.class))).thenReturn(tpi);
when(producerProvider.getRuleEngineMsgProducer()).thenReturn(tbREQueueProducer);
clusterService.pushMsgToRuleEngine(TenantId.SYS_TENANT_ID, tenantId, requestMsg, true, callback);
verify(partitionService).resolve(ServiceType.TB_RULE_ENGINE, DataConstants.HP_QUEUE_NAME, tenantId, tenantId);
verify(producerProvider).getRuleEngineMsgProducer();
ArgumentCaptor<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> protoQueueMsgArgumentCaptor = ArgumentCaptor.forClass(TbProtoQueueMsg.class);
verify(tbREQueueProducer).send(eq(tpi), protoQueueMsgArgumentCaptor.capture(), eq(callback));
TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg> protoQueueMsgArgumentCaptorValue = protoQueueMsgArgumentCaptor.getValue();
assertThat(protoQueueMsgArgumentCaptorValue.getKey()).isEqualTo(requestMsg.getId());
assertThat(protoQueueMsgArgumentCaptorValue.getValue()).isEqualTo(toRuleEngineMsg);
assertThat(protoQueueMsgArgumentCaptorValue.getHeaders().getData()).isEqualTo(new DefaultTbQueueMsgHeaders().getData());
verify(ruleEngineProducerService).sendToRuleEngine(tbREQueueProducer, tenantId, requestMsg, callback);
}
@Test
@ -326,38 +313,25 @@ public class DefaultTbClusterServiceTest {
public void testPushMsgToRuleEngineWithTenantIdIsNotNullUuidUseQueueFromMsgIsTrue() {
TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> tbREQueueProducer = mock(TbQueueProducer.class);
TbQueueCallback callback = mock(TbQueueCallback.class);
TopicPartitionInfo tpi = mock(TopicPartitionInfo.class);
TenantId tenantId = TenantId.fromUUID(UUID.fromString("3c8bd350-1239-4a3b-b9c3-4dd76f8e20f1"));
DeviceId deviceId = new DeviceId(UUID.fromString("adbb9d41-3367-40fd-9e74-7dd7cc5d30cf"));
DeviceProfile deviceProfile = new DeviceProfile(new DeviceProfileId(UUID.fromString("552f5d6d-0b2b-43e1-a7d2-a51cb2a96927")));
TbMsg requestMsg = TbMsg.newMsg(DataConstants.HP_QUEUE_NAME, TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
TransportProtos.ToRuleEngineMsg toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(requestMsg)).build();
when(deviceProfileCache.get(any(TenantId.class), any(DeviceId.class))).thenReturn(deviceProfile);
when(partitionService.resolve(any(ServiceType.class), any(String.class), any(TenantId.class), any(EntityId.class))).thenReturn(tpi);
when(producerProvider.getRuleEngineMsgProducer()).thenReturn(tbREQueueProducer);
clusterService.pushMsgToRuleEngine(tenantId, deviceId, requestMsg, true, callback);
verify(partitionService).resolve(ServiceType.TB_RULE_ENGINE, DataConstants.HP_QUEUE_NAME, tenantId, deviceId);
verify(producerProvider).getRuleEngineMsgProducer();
ArgumentCaptor<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> protoQueueMsgArgumentCaptor = ArgumentCaptor.forClass(TbProtoQueueMsg.class);
verify(tbREQueueProducer).send(eq(tpi), protoQueueMsgArgumentCaptor.capture(), eq(callback));
TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg> protoQueueMsgArgumentCaptorValue = protoQueueMsgArgumentCaptor.getValue();
assertThat(protoQueueMsgArgumentCaptorValue.getKey()).isEqualTo(requestMsg.getId());
assertThat(protoQueueMsgArgumentCaptorValue.getValue()).isEqualTo(toRuleEngineMsg);
assertThat(protoQueueMsgArgumentCaptorValue.getHeaders().getData()).isEqualTo(new DefaultTbQueueMsgHeaders().getData());
verify(ruleEngineProducerService).sendToRuleEngine(tbREQueueProducer, tenantId, requestMsg, callback);
}
@Test
public void testPushMsgToRuleEngineUseQueueFromMsgIsFalse() {
TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> tbREQueueProducer = mock(TbQueueProducer.class);
TbQueueCallback callback = mock(TbQueueCallback.class);
TopicPartitionInfo tpi = mock(TopicPartitionInfo.class);
TenantId tenantId = TenantId.fromUUID(UUID.fromString("5377c8d0-26e5-4d81-84c6-4344043973c8"));
DeviceId deviceId = new DeviceId(UUID.fromString("016c2abb-f46f-49f9-a83d-4d28b803cfe6"));
@ -365,25 +339,16 @@ public class DefaultTbClusterServiceTest {
deviceProfile.setDefaultQueueName(DataConstants.MAIN_QUEUE_NAME);
TbMsg requestMsg = TbMsg.newMsg(TbMsgType.REST_API_REQUEST, deviceId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
TransportProtos.ToRuleEngineMsg toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(requestMsg)).build();
when(deviceProfileCache.get(any(TenantId.class), any(DeviceId.class))).thenReturn(deviceProfile);
when(partitionService.resolve(any(ServiceType.class), any(String.class), any(TenantId.class), any(EntityId.class))).thenReturn(tpi);
when(producerProvider.getRuleEngineMsgProducer()).thenReturn(tbREQueueProducer);
clusterService.pushMsgToRuleEngine(tenantId, deviceId, requestMsg, false, callback);
verify(partitionService).resolve(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId, deviceId);
verify(producerProvider).getRuleEngineMsgProducer();
ArgumentCaptor<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> protoQueueMsgArgumentCaptor = ArgumentCaptor.forClass(TbProtoQueueMsg.class);
verify(tbREQueueProducer).send(eq(tpi), protoQueueMsgArgumentCaptor.capture(), eq(callback));
TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg> protoQueueMsgArgumentCaptorValue = protoQueueMsgArgumentCaptor.getValue();
assertThat(protoQueueMsgArgumentCaptorValue.getKey()).isEqualTo(requestMsg.getId());
assertThat(protoQueueMsgArgumentCaptorValue.getValue()).isEqualTo(toRuleEngineMsg);
assertThat(protoQueueMsgArgumentCaptorValue.getHeaders().getData()).isEqualTo(new DefaultTbQueueMsgHeaders().getData());
TbMsg expectedMsg = TbMsg.transformMsgQueueName(requestMsg, DataConstants.MAIN_QUEUE_NAME);
ArgumentCaptor<TbMsg> actualMsg = ArgumentCaptor.forClass(TbMsg.class);
verify(ruleEngineProducerService).sendToRuleEngine(eq(tbREQueueProducer), eq(tenantId), actualMsg.capture(), eq(callback));
assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("ctx").isEqualTo(expectedMsg);
}
protected Queue createTestQueue() {