Add tests for onActivity() in integration activity manager, add concurrency fix in onActivity()
This commit is contained in:
parent
47c72fb4e5
commit
a31bd5e2c5
@ -55,15 +55,20 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatNoException;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.isNull;
|
||||
import static org.mockito.Mockito.doCallRealMethod;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoInteractions;
|
||||
@ -78,6 +83,182 @@ public class IntegrationActivityManagerTest {
|
||||
@Mock
|
||||
private DefaultPlatformIntegrationService integrationServiceMock;
|
||||
|
||||
@Test
|
||||
void givenKeyIsNull_whenOnActivity_thenShouldNotRecordAndShouldNotReport() {
|
||||
// GIVEN
|
||||
ConcurrentMap<IntegrationActivityKey, Object> states = new ConcurrentHashMap<>();
|
||||
ReflectionTestUtils.setField(integrationServiceMock, "states", states);
|
||||
|
||||
IntegrationActivityKey key = null;
|
||||
|
||||
doCallRealMethod().when(integrationServiceMock).onActivity(eq(key), anyLong());
|
||||
|
||||
// WHEN
|
||||
integrationServiceMock.onActivity(key, 123L);
|
||||
|
||||
// THEN
|
||||
assertThat(states.isEmpty()).isTrue();
|
||||
verify(integrationServiceMock, never()).createNewState(any());
|
||||
verify(integrationServiceMock, never()).getStrategy();
|
||||
verify(integrationServiceMock, never()).reportActivity(any(), any(), anyLong(), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void givenNewActivity_whenOnActivity_thenShouldCreateNewStateAndRecord() {
|
||||
// GIVEN
|
||||
ConcurrentMap<IntegrationActivityKey, Object> states = new ConcurrentHashMap<>();
|
||||
ReflectionTestUtils.setField(integrationServiceMock, "states", states);
|
||||
|
||||
ActivityStrategy strategyMock = mock(ActivityStrategy.class);
|
||||
when(integrationServiceMock.getStrategy()).thenReturn(strategyMock);
|
||||
|
||||
var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID);
|
||||
when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>());
|
||||
|
||||
long activityTime = 123L;
|
||||
doCallRealMethod().when(integrationServiceMock).onActivity(key, activityTime);
|
||||
|
||||
when(integrationServiceMock.getLastRecordedTime(key)).thenCallRealMethod();
|
||||
|
||||
// WHEN
|
||||
integrationServiceMock.onActivity(key, activityTime);
|
||||
|
||||
// THEN
|
||||
assertThat(integrationServiceMock.getLastRecordedTime(key)).isEqualTo(activityTime);
|
||||
verify(integrationServiceMock).createNewState(key);
|
||||
verify(integrationServiceMock).getStrategy();
|
||||
}
|
||||
|
||||
@Test
|
||||
void givenSubsequentActivities_whenOnActivity_thenShouldRecordBoth() {
|
||||
// GIVEN
|
||||
ConcurrentMap<IntegrationActivityKey, Object> states = new ConcurrentHashMap<>();
|
||||
ReflectionTestUtils.setField(integrationServiceMock, "states", states);
|
||||
|
||||
ActivityStrategy strategyMock = mock(ActivityStrategy.class);
|
||||
when(integrationServiceMock.getStrategy()).thenReturn(strategyMock);
|
||||
|
||||
var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID);
|
||||
when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>());
|
||||
|
||||
when(integrationServiceMock.getLastRecordedTime(key)).thenCallRealMethod();
|
||||
|
||||
// WHEN-THEN
|
||||
long firstActivityTime = 100L;
|
||||
doCallRealMethod().when(integrationServiceMock).onActivity(key, firstActivityTime);
|
||||
integrationServiceMock.onActivity(key, firstActivityTime);
|
||||
assertThat(integrationServiceMock.getLastRecordedTime(key)).isEqualTo(firstActivityTime);
|
||||
|
||||
long secondActivityTime = 123L;
|
||||
doCallRealMethod().when(integrationServiceMock).onActivity(key, secondActivityTime);
|
||||
integrationServiceMock.onActivity(key, secondActivityTime);
|
||||
assertThat(integrationServiceMock.getLastRecordedTime(key)).isEqualTo(secondActivityTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
void givenActivityAndStrategySaysThatShouldNotReport_whenOnActivity_thenShouldNotReport() {
|
||||
// GIVEN
|
||||
ConcurrentMap<IntegrationActivityKey, Object> states = new ConcurrentHashMap<>();
|
||||
ReflectionTestUtils.setField(integrationServiceMock, "states", states);
|
||||
|
||||
ActivityStrategy strategyMock = mock(ActivityStrategy.class);
|
||||
when(strategyMock.onActivity()).thenReturn(false);
|
||||
|
||||
var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID);
|
||||
when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>());
|
||||
when(integrationServiceMock.getStrategy()).thenReturn(strategyMock);
|
||||
|
||||
long activityTime = 123L;
|
||||
doCallRealMethod().when(integrationServiceMock).onActivity(key, activityTime);
|
||||
|
||||
// WHEN
|
||||
integrationServiceMock.onActivity(key, activityTime);
|
||||
|
||||
// THEN
|
||||
verify(integrationServiceMock, never()).reportActivity(any(), any(), anyLong(), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void givenActivityAndStrategySaysThatShouldReport_whenOnActivity_thenShouldReport() {
|
||||
// GIVEN
|
||||
ConcurrentMap<IntegrationActivityKey, Object> states = new ConcurrentHashMap<>();
|
||||
ReflectionTestUtils.setField(integrationServiceMock, "states", states);
|
||||
|
||||
ActivityStrategy strategyMock = mock(ActivityStrategy.class);
|
||||
when(strategyMock.onActivity()).thenReturn(true);
|
||||
|
||||
var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID);
|
||||
when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>());
|
||||
when(integrationServiceMock.getStrategy()).thenReturn(strategyMock);
|
||||
|
||||
long activityTime = 123L;
|
||||
doCallRealMethod().when(integrationServiceMock).onActivity(key, activityTime);
|
||||
|
||||
// WHEN
|
||||
integrationServiceMock.onActivity(key, activityTime);
|
||||
|
||||
// THEN
|
||||
verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(activityTime), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void givenActivityAndStrategySaysThatShouldReportButLastRecordedTimeIsEqualToLastReportedTime_whenOnActivity_thenShouldNotReport() {
|
||||
// GIVEN
|
||||
ConcurrentMap<IntegrationActivityKey, Object> states = new ConcurrentHashMap<>();
|
||||
ReflectionTestUtils.setField(integrationServiceMock, "states", states);
|
||||
|
||||
ActivityStrategy strategyMock = mock(ActivityStrategy.class);
|
||||
when(strategyMock.onActivity()).thenReturn(true);
|
||||
|
||||
var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID);
|
||||
when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>());
|
||||
when(integrationServiceMock.getStrategy()).thenReturn(strategyMock);
|
||||
|
||||
// WHEN-THEN
|
||||
long firstActivityTime = 123L;
|
||||
doCallRealMethod().when(integrationServiceMock).onActivity(key, firstActivityTime);
|
||||
integrationServiceMock.onActivity(key, firstActivityTime);
|
||||
|
||||
ArgumentCaptor<ActivityReportCallback<IntegrationActivityKey>> firstCallbackCaptor = ArgumentCaptor.forClass(ActivityReportCallback.class);
|
||||
verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(firstActivityTime), firstCallbackCaptor.capture());
|
||||
firstCallbackCaptor.getValue().onSuccess(key, firstActivityTime);
|
||||
|
||||
long secondActivityTime = 100L;
|
||||
doCallRealMethod().when(integrationServiceMock).onActivity(key, secondActivityTime);
|
||||
integrationServiceMock.onActivity(key, secondActivityTime);
|
||||
|
||||
verify(integrationServiceMock, never()).reportActivity(eq(key), any(), eq(secondActivityTime), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void givenActivityAndStrategySaysThatShouldReportAndLastRecordedTimeIsGreaterThanLastReportedTime_whenOnActivity_thenShouldReport() {
|
||||
// GIVEN
|
||||
ConcurrentMap<IntegrationActivityKey, Object> states = new ConcurrentHashMap<>();
|
||||
ReflectionTestUtils.setField(integrationServiceMock, "states", states);
|
||||
|
||||
ActivityStrategy strategyMock = mock(ActivityStrategy.class);
|
||||
when(strategyMock.onActivity()).thenReturn(true);
|
||||
|
||||
var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID);
|
||||
when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>());
|
||||
when(integrationServiceMock.getStrategy()).thenReturn(strategyMock);
|
||||
|
||||
// WHEN-THEN
|
||||
long firstActivityTime = 123L;
|
||||
doCallRealMethod().when(integrationServiceMock).onActivity(key, firstActivityTime);
|
||||
integrationServiceMock.onActivity(key, firstActivityTime);
|
||||
|
||||
ArgumentCaptor<ActivityReportCallback<IntegrationActivityKey>> firstCallbackCaptor = ArgumentCaptor.forClass(ActivityReportCallback.class);
|
||||
verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(firstActivityTime), firstCallbackCaptor.capture());
|
||||
firstCallbackCaptor.getValue().onSuccess(key, firstActivityTime);
|
||||
|
||||
long secondActivityTime = 456L;
|
||||
doCallRealMethod().when(integrationServiceMock).onActivity(key, secondActivityTime);
|
||||
integrationServiceMock.onActivity(key, secondActivityTime);
|
||||
|
||||
verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(secondActivityTime), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void givenKeyAndTimeToReport_whenReportingActivity_thenShouldCorrectlyReportActivity() {
|
||||
// GIVEN
|
||||
@ -148,12 +329,14 @@ public class IntegrationActivityManagerTest {
|
||||
.build();
|
||||
TransportProtos.PostTelemetryMsg postTelemetryMsg = TransportProtos.PostTelemetryMsg.getDefaultInstance();
|
||||
doCallRealMethod().when(integrationServiceMock).process(sessionInfo, postTelemetryMsg, null);
|
||||
long expectedTime = 123L;
|
||||
when(integrationServiceMock.getCurrentTimeMillis()).thenReturn(expectedTime);
|
||||
|
||||
// WHEN
|
||||
integrationServiceMock.process(sessionInfo, postTelemetryMsg, null);
|
||||
|
||||
// THEN
|
||||
verify(integrationServiceMock).onActivity(key);
|
||||
verify(integrationServiceMock).onActivity(key, expectedTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -172,11 +355,14 @@ public class IntegrationActivityManagerTest {
|
||||
doCallRealMethod().when(integrationServiceMock).process(sessionInfo, postAttributeMsg, null);
|
||||
doNothing().when(integrationServiceMock).sendToRuleEngine(any(), any(), any(), any(), any(), any(), any());
|
||||
|
||||
long activityTime = 123L;
|
||||
when(integrationServiceMock.getCurrentTimeMillis()).thenReturn(activityTime);
|
||||
|
||||
// WHEN
|
||||
integrationServiceMock.process(sessionInfo, postAttributeMsg, null);
|
||||
|
||||
// THEN
|
||||
verify(integrationServiceMock).onActivity(key);
|
||||
verify(integrationServiceMock).onActivity(key, activityTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@Slf4j
|
||||
public abstract class AbstractActivityManager<Key, Metadata> implements ActivityManager<Key> {
|
||||
@ -80,15 +81,17 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
|
||||
protected abstract void reportActivity(Key key, Metadata metadata, long timeToReport, ActivityReportCallback<Key> callback);
|
||||
|
||||
@Override
|
||||
public void onActivity(Key key) {
|
||||
public void onActivity(Key key, long newLastRecordedTime) {
|
||||
if (key == null) {
|
||||
log.error("Failed to process activity event: provided activity key is null.");
|
||||
return;
|
||||
}
|
||||
log.debug("Received activity event for key: [{}]", key);
|
||||
|
||||
long newLastRecordedTime = System.currentTimeMillis();
|
||||
var shouldReport = new AtomicBoolean(false);
|
||||
var lastRecordedTime = new AtomicLong();
|
||||
var lastReportedTime = new AtomicLong();
|
||||
|
||||
var activityStateWrapper = states.compute(key, (__, stateWrapper) -> {
|
||||
if (stateWrapper == null) {
|
||||
var newState = createNewState(key);
|
||||
@ -104,6 +107,8 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
|
||||
state.setLastRecordedTime(newLastRecordedTime);
|
||||
}
|
||||
shouldReport.set(stateWrapper.getStrategy().onActivity());
|
||||
lastRecordedTime.set(state.getLastRecordedTime());
|
||||
lastReportedTime.set(stateWrapper.getLastReportedTime());
|
||||
return stateWrapper;
|
||||
});
|
||||
|
||||
@ -111,12 +116,9 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
|
||||
return;
|
||||
}
|
||||
|
||||
var activityState = activityStateWrapper.getState();
|
||||
long lastRecordedTime = activityState.getLastRecordedTime();
|
||||
long lastReportedTime = activityStateWrapper.getLastReportedTime();
|
||||
if (shouldReport.get() && lastReportedTime < lastRecordedTime) {
|
||||
if (shouldReport.get() && lastReportedTime.get() < lastRecordedTime.get()) {
|
||||
log.debug("Going to report first activity event for key: [{}].", key);
|
||||
reportActivity(key, activityState.getMetadata(), lastRecordedTime, new ActivityReportCallback<>() {
|
||||
reportActivity(key, activityStateWrapper.getState().getMetadata(), lastRecordedTime.get(), new ActivityReportCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(Key key, long reportedTime) {
|
||||
updateLastReportedTime(key, reportedTime);
|
||||
@ -131,12 +133,7 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastRecordedTime(Key key) {
|
||||
ActivityStateWrapper stateWrapper = states.get(key);
|
||||
return stateWrapper == null ? 0L : stateWrapper.getState().getLastRecordedTime();
|
||||
}
|
||||
|
||||
private void onReportingPeriodEnd() {
|
||||
public void onReportingPeriodEnd() {
|
||||
log.debug("Going to end reporting period.");
|
||||
for (Map.Entry<Key, ActivityStateWrapper> entry : states.entrySet()) {
|
||||
var key = entry.getKey();
|
||||
@ -185,6 +182,12 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastRecordedTime(Key key) {
|
||||
ActivityStateWrapper stateWrapper = states.get(key);
|
||||
return stateWrapper == null ? 0L : stateWrapper.getState().getLastRecordedTime();
|
||||
}
|
||||
|
||||
private void updateLastReportedTime(Key key, long newLastReportedTime) {
|
||||
states.computeIfPresent(key, (__, stateWrapper) -> {
|
||||
stateWrapper.setLastReportedTime(Math.max(stateWrapper.getLastReportedTime(), newLastReportedTime));
|
||||
|
||||
@ -32,7 +32,9 @@ package org.thingsboard.server.common.transport.activity;
|
||||
|
||||
public interface ActivityManager<Key> {
|
||||
|
||||
void onActivity(Key key);
|
||||
void onActivity(Key key, long activityTimeMillis);
|
||||
|
||||
void onReportingPeriodEnd();
|
||||
|
||||
long getLastRecordedTime(Key key);
|
||||
|
||||
|
||||
@ -785,7 +785,7 @@ public class DefaultTransportService extends AbstractActivityManager<UUID, Trans
|
||||
}
|
||||
|
||||
private void recordActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
|
||||
onActivity(toSessionId(sessionInfo));
|
||||
onActivity(toSessionId(sessionInfo), getCurrentTimeMillis());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -183,12 +183,14 @@ public class TransportActivityManagerTest {
|
||||
.build();
|
||||
doCallRealMethod().when(transportServiceMock).recordActivity(sessionInfo);
|
||||
when(transportServiceMock.toSessionId(sessionInfo)).thenReturn(SESSION_ID);
|
||||
long expectedTime = 123L;
|
||||
when(transportServiceMock.getCurrentTimeMillis()).thenReturn(expectedTime);
|
||||
|
||||
// WHEN
|
||||
transportServiceMock.recordActivity(sessionInfo);
|
||||
|
||||
// THEN
|
||||
verify(transportServiceMock).onActivity(SESSION_ID);
|
||||
verify(transportServiceMock).onActivity(SESSION_ID, expectedTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user