Added test to verify attribute and RPC subscriptions removal on session overflow.
Signed-off-by: Oleksandra Matviienko <al.zzzeebra@gmail.com>
This commit is contained in:
parent
ba840c6543
commit
76155aca75
@ -489,7 +489,7 @@ public class ActorSystemContext {
|
|||||||
|
|
||||||
@Value("${actors.session.max_concurrent_sessions_per_device:1}")
|
@Value("${actors.session.max_concurrent_sessions_per_device:1}")
|
||||||
@Getter
|
@Getter
|
||||||
private long maxConcurrentSessionsPerDevice;
|
private int maxConcurrentSessionsPerDevice;
|
||||||
|
|
||||||
@Value("${actors.session.sync.timeout:10000}")
|
@Value("${actors.session.sync.timeout:10000}")
|
||||||
@Getter
|
@Getter
|
||||||
|
|||||||
@ -126,8 +126,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
|
|||||||
final TenantId tenantId;
|
final TenantId tenantId;
|
||||||
final DeviceId deviceId;
|
final DeviceId deviceId;
|
||||||
final LinkedHashMapRemoveEldest<UUID, SessionInfoMetaData> sessions;
|
final LinkedHashMapRemoveEldest<UUID, SessionInfoMetaData> sessions;
|
||||||
private final Map<UUID, SessionInfo> attributeSubscriptions;
|
final Map<UUID, SessionInfo> attributeSubscriptions;
|
||||||
private final Map<UUID, SessionInfo> rpcSubscriptions;
|
final Map<UUID, SessionInfo> rpcSubscriptions;
|
||||||
private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
|
private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
|
||||||
private final boolean rpcSequential;
|
private final boolean rpcSequential;
|
||||||
private final RpcSubmitStrategy rpcSubmitStrategy;
|
private final RpcSubmitStrategy rpcSubmitStrategy;
|
||||||
|
|||||||
@ -17,22 +17,27 @@ package org.thingsboard.server.actors.device;
|
|||||||
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import org.thingsboard.common.util.LinkedHashMapRemoveEldest;
|
import org.thingsboard.common.util.LinkedHashMapRemoveEldest;
|
||||||
import org.thingsboard.server.actors.ActorSystemContext;
|
import org.thingsboard.server.actors.ActorSystemContext;
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.dao.device.DeviceService;
|
import org.thingsboard.server.dao.device.DeviceService;
|
||||||
|
import org.thingsboard.server.service.transport.TbCoreToTransportService;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.hamcrest.CoreMatchers.notNullValue;
|
import static org.hamcrest.CoreMatchers.notNullValue;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.mockito.BDDMockito.willReturn;
|
import static org.mockito.BDDMockito.willReturn;
|
||||||
|
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
public class DeviceActorMessageProcessorTest {
|
public class DeviceActorMessageProcessorTest {
|
||||||
|
|
||||||
public static final long MAX_CONCURRENT_SESSIONS_PER_DEVICE = 10L;
|
public static final int MAX_CONCURRENT_SESSIONS_PER_DEVICE = 10;
|
||||||
ActorSystemContext systemContext;
|
ActorSystemContext systemContext;
|
||||||
DeviceService deviceService;
|
DeviceService deviceService;
|
||||||
TenantId tenantId = TenantId.SYS_TENANT_ID;
|
TenantId tenantId = TenantId.SYS_TENANT_ID;
|
||||||
@ -47,6 +52,7 @@ public class DeviceActorMessageProcessorTest {
|
|||||||
willReturn(MAX_CONCURRENT_SESSIONS_PER_DEVICE).given(systemContext).getMaxConcurrentSessionsPerDevice();
|
willReturn(MAX_CONCURRENT_SESSIONS_PER_DEVICE).given(systemContext).getMaxConcurrentSessionsPerDevice();
|
||||||
willReturn(deviceService).given(systemContext).getDeviceService();
|
willReturn(deviceService).given(systemContext).getDeviceService();
|
||||||
processor = new DeviceActorMessageProcessor(systemContext, tenantId, deviceId);
|
processor = new DeviceActorMessageProcessor(systemContext, tenantId, deviceId);
|
||||||
|
willReturn(mock(TbCoreToTransportService.class)).given(systemContext).getTbCoreToTransportService();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -55,4 +61,26 @@ public class DeviceActorMessageProcessorTest {
|
|||||||
assertThat(processor.sessions.getMaxEntries(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE));
|
assertThat(processor.sessions.getMaxEntries(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE));
|
||||||
assertThat(processor.sessions.getRemovalConsumer(), notNullValue());
|
assertThat(processor.sessions.getRemovalConsumer(), notNullValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenFullSessionMap_whenSessionOverflow_thenShouldDeleteAttributeAndRPCSubscriptions() {
|
||||||
|
//givenFullSessionMap
|
||||||
|
for (int i = 0; i < MAX_CONCURRENT_SESSIONS_PER_DEVICE; i++) {
|
||||||
|
UUID sessionID = UUID.randomUUID();
|
||||||
|
processor.sessions.put(sessionID, Mockito.mock(SessionInfoMetaData.class, RETURNS_DEEP_STUBS));
|
||||||
|
processor.attributeSubscriptions.put(sessionID, Mockito.mock(SessionInfo.class));
|
||||||
|
processor.rpcSubscriptions.put(sessionID, Mockito.mock(SessionInfo.class));
|
||||||
|
}
|
||||||
|
assertThat(processor.sessions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE));
|
||||||
|
assertThat(processor.attributeSubscriptions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE));
|
||||||
|
assertThat(processor.rpcSubscriptions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE));
|
||||||
|
|
||||||
|
//add one more
|
||||||
|
processor.sessions.put(UUID.randomUUID(), Mockito.mock(SessionInfoMetaData.class));
|
||||||
|
|
||||||
|
assertThat(processor.sessions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE));
|
||||||
|
assertThat(processor.attributeSubscriptions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE-1));
|
||||||
|
assertThat(processor.rpcSubscriptions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE-1));
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user