Add mock test for new and a part of exisiting logic in core consumer; make changes according to review comments

This commit is contained in:
Dmytro Skarzhynets 2023-09-20 14:38:37 +03:00
parent 1303975c05
commit 5d120a7c23
6 changed files with 297 additions and 19 deletions

View File

@ -601,7 +601,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
stateService.onQueueMsg(deviceStateServiceMsg, callback);
}
private void forwardToStateService(TransportProtos.DeviceConnectProto deviceConnectMsg, TbCallback callback) {
void forwardToStateService(TransportProtos.DeviceConnectProto deviceConnectMsg, TbCallback callback) {
var tenantId = toTenantId(deviceConnectMsg.getTenantIdMSB(), deviceConnectMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceConnectMsg.getDeviceIdMSB(), deviceConnectMsg.getDeviceIdLSB()));
try {
@ -613,7 +613,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
}
}
private void forwardToStateService(TransportProtos.DeviceActivityProto deviceActivityMsg, TbCallback callback) {
void forwardToStateService(TransportProtos.DeviceActivityProto deviceActivityMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceActivityMsg);
}
@ -628,7 +628,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
}
}
private void forwardToStateService(TransportProtos.DeviceDisconnectProto deviceDisconnectMsg, TbCallback callback) {
void forwardToStateService(TransportProtos.DeviceDisconnectProto deviceDisconnectMsg, TbCallback callback) {
var tenantId = toTenantId(deviceDisconnectMsg.getTenantIdMSB(), deviceDisconnectMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceDisconnectMsg.getDeviceIdMSB(), deviceDisconnectMsg.getDeviceIdLSB()));
try {
@ -640,7 +640,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
}
}
private void forwardToStateService(TransportProtos.DeviceInactivityProto deviceInactivityMsg, TbCallback callback) {
void forwardToStateService(TransportProtos.DeviceInactivityProto deviceInactivityMsg, TbCallback callback) {
var tenantId = toTenantId(deviceInactivityMsg.getTenantIdMSB(), deviceInactivityMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceInactivityMsg.getDeviceIdMSB(), deviceInactivityMsg.getDeviceIdLSB()));
try {

View File

@ -554,7 +554,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
.build());
}
private boolean cleanDeviceStateIfBelongsToExternalPartition(TenantId tenantId, final DeviceId deviceId) {
boolean cleanDeviceStateIfBelongsToExternalPartition(TenantId tenantId, final DeviceId deviceId) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
boolean cleanup = !partitionedEntities.containsKey(tpi);
if (cleanup) {

View File

@ -0,0 +1,281 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.queue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.service.state.DeviceStateService;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@ExtendWith(MockitoExtension.class)
public class DefaultTbCoreConsumerServiceTest {
@Mock
private DeviceStateService stateServiceMock;
@Mock
private TbCoreConsumerStats statsMock;
@Mock
private TbCallback tbCallbackMock;
@Mock
private DefaultTbCoreConsumerService defaultTbCoreConsumerServiceMock;
@BeforeEach
public void setup() {
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stateService", stateServiceMock);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock);
}
@Test
public void givenConnectMsg_whenForwardToStateService_thenOnDeviceConnectAndCallbackAreCalled() {
// GIVEN
var tenantId = new TenantId(UUID.randomUUID());
var deviceId = new DeviceId(UUID.randomUUID());
var time = System.currentTimeMillis();
var connectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastConnectTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(connectMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(connectMsg, tbCallbackMock);
// THEN
then(stateServiceMock).should(times(1)).onDeviceConnect(tenantId, deviceId, time);
then(tbCallbackMock).should(times(1)).onSuccess();
}
@Test
public void givenOnDeviceConnectThrowsException_whenForwardToStateService_thenOnlyCallbackOnFailureIsCalled() {
// GIVEN
var tenantId = new TenantId(UUID.randomUUID());
var deviceId = new DeviceId(UUID.randomUUID());
var time = System.currentTimeMillis();
var connectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastConnectTime(time)
.build();
var runtimeException = new RuntimeException("Something bad happened!");
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(connectMsg, tbCallbackMock);
doThrow(runtimeException).when(stateServiceMock).onDeviceConnect(tenantId, deviceId, time);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(connectMsg, tbCallbackMock);
// THEN
then(tbCallbackMock).should(never()).onSuccess();
then(tbCallbackMock).should(times(1)).onFailure(runtimeException);
}
@Test
public void givenActivityMsgAndStatsAreEnabled_whenForwardToStateService_thenOnDeviceActivityAndCallbackAreCalled() {
// GIVEN
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", true);
var tenantId = new TenantId(UUID.randomUUID());
var deviceId = new DeviceId(UUID.randomUUID());
var time = System.currentTimeMillis();
var activityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastActivityTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(activityMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(activityMsg, tbCallbackMock);
// THEN
then(statsMock).should(times(1)).log(activityMsg);
then(stateServiceMock).should(times(1)).onDeviceActivity(tenantId, deviceId, time);
then(tbCallbackMock).should(times(1)).onSuccess();
}
@Test
public void givenOnDeviceActivityThrowsException_whenForwardToStateService_thenOnlyCallbackOnFailureIsCalled() {
// GIVEN
var tenantId = new TenantId(UUID.randomUUID());
var deviceId = new DeviceId(UUID.randomUUID());
var time = System.currentTimeMillis();
var activityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastActivityTime(time)
.build();
var runtimeException = new RuntimeException("Something bad happened!");
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(activityMsg, tbCallbackMock);
doThrow(runtimeException).when(stateServiceMock).onDeviceActivity(tenantId, deviceId, time);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(activityMsg, tbCallbackMock);
// THEN
then(tbCallbackMock).should(never()).onSuccess();
var exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
then(tbCallbackMock).should(times(1)).onFailure(exceptionCaptor.capture());
assertThat(exceptionCaptor.getValue())
.isInstanceOf(RuntimeException.class)
.hasMessage("Failed to update device activity for device [" + deviceId.getId() + "]!");
}
@Test
public void givenDisconnectMsg_whenForwardToStateService_thenOnDeviceDisconnectAndCallbackAreCalled() {
// GIVEN
var tenantId = new TenantId(UUID.randomUUID());
var deviceId = new DeviceId(UUID.randomUUID());
var time = System.currentTimeMillis();
var disconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastDisconnectTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(disconnectMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(disconnectMsg, tbCallbackMock);
// THEN
then(stateServiceMock).should(times(1)).onDeviceDisconnect(tenantId, deviceId, time);
then(tbCallbackMock).should(times(1)).onSuccess();
}
@Test
public void givenOnDeviceDisconnectThrowsException_whenForwardToStateService_thenOnlyCallbackOnFailureIsCalled() {
// GIVEN
var tenantId = new TenantId(UUID.randomUUID());
var deviceId = new DeviceId(UUID.randomUUID());
var time = System.currentTimeMillis();
var disconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastDisconnectTime(time)
.build();
var runtimeException = new RuntimeException("Something bad happened!");
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(disconnectMsg, tbCallbackMock);
doThrow(runtimeException).when(stateServiceMock).onDeviceDisconnect(tenantId, deviceId, time);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(disconnectMsg, tbCallbackMock);
// THEN
then(tbCallbackMock).should(never()).onSuccess();
then(tbCallbackMock).should(times(1)).onFailure(runtimeException);
}
@Test
public void givenInactivityMsg_whenForwardToStateService_thenOnDeviceInactivityAndCallbackAreCalled() {
// GIVEN
var tenantId = new TenantId(UUID.randomUUID());
var deviceId = new DeviceId(UUID.randomUUID());
var time = System.currentTimeMillis();
var inactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastInactivityTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityMsg, tbCallbackMock);
// THEN
then(stateServiceMock).should(times(1)).onDeviceInactivity(tenantId, deviceId, time);
then(tbCallbackMock).should(times(1)).onSuccess();
}
@Test
public void givenOnDeviceInactivityThrowsException_whenForwardToStateService_thenOnlyCallbackOnFailureIsCalled() {
// GIVEN
var tenantId = new TenantId(UUID.randomUUID());
var deviceId = new DeviceId(UUID.randomUUID());
var time = System.currentTimeMillis();
var inactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastInactivityTime(time)
.build();
var runtimeException = new RuntimeException("Something bad happened!");
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityMsg, tbCallbackMock);
doThrow(runtimeException).when(stateServiceMock).onDeviceInactivity(tenantId, deviceId, time);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityMsg, tbCallbackMock);
// THEN
then(tbCallbackMock).should(never()).onSuccess();
then(tbCallbackMock).should(times(1)).onFailure(runtimeException);
}
}

View File

@ -60,14 +60,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -125,15 +124,13 @@ public class DefaultDeviceStateServiceTest {
@Test
public void givenDeviceBelongsToExternalPartition_whenOnDeviceInactivity_thenCleansStateAndDoesNotReportInactivity() {
// GIVEN
service.deviceStates.put(deviceId, DeviceStateData.builder().build());
given(partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId))
.willReturn(TopicPartitionInfo.builder().build());
doReturn(true).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
// WHEN
service.onDeviceInactivity(tenantId, deviceId, System.currentTimeMillis());
// THEN
assertThat(service.deviceStates).isEmpty();
then(service).should(times(1)).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
then(service).should(never()).fetchDeviceStateDataUsingSeparateRequests(deviceId);
then(clusterService).shouldHaveNoInteractions();
then(notificationRuleProcessor).shouldHaveNoInteractions();
@ -143,11 +140,6 @@ public class DefaultDeviceStateServiceTest {
@Test
public void givenDeviceBelongsToMyPartition_whenOnDeviceInactivity_thenReportsInactivity() {
// GIVEN
try {
initStateService(10000);
} catch (InterruptedException e) {
fail("Device state service failed to initialize!");
}
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
@ -155,6 +147,8 @@ public class DefaultDeviceStateServiceTest {
.metaData(new TbMsgMetaData())
.build();
doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
service.deviceStates.put(deviceId, deviceStateData);
long lastInactivityTime = System.currentTimeMillis();
@ -196,8 +190,6 @@ public class DefaultDeviceStateServiceTest {
.metaData(new TbMsgMetaData())
.build();
given(partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId)).willReturn(tpi);
// WHEN
service.updateInactivityStateIfExpired(System.currentTimeMillis(), deviceId, deviceStateData);

View File

@ -60,7 +60,7 @@ import java.util.Set;
)
public class TbDeviceStateNode implements TbNode {
private static final Set<TbMsgType> SUPPORTED_EVENTS = Set.of(
static final Set<TbMsgType> SUPPORTED_EVENTS = Set.of(
TbMsgType.CONNECT_EVENT, TbMsgType.ACTIVITY_EVENT, TbMsgType.DISCONNECT_EVENT, TbMsgType.INACTIVITY_EVENT
);

View File

@ -41,6 +41,7 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Stream;
@ -97,6 +98,10 @@ public class TbDeviceStateNodeTest {
// THEN
assertThat(config.getEvent()).isEqualTo(TbMsgType.ACTIVITY_EVENT);
assertThat(TbDeviceStateNode.SUPPORTED_EVENTS).isEqualTo(Set.of(
TbMsgType.CONNECT_EVENT, TbMsgType.ACTIVITY_EVENT,
TbMsgType.DISCONNECT_EVENT, TbMsgType.INACTIVITY_EVENT
));
}
@Test