diff --git a/application/src/test/java/org/thingsboard/server/service/integration/IntegrationActivityManagerTest.java b/application/src/test/java/org/thingsboard/server/service/integration/IntegrationActivityManagerTest.java index c63b267fbf..3cb40d3cb3 100644 --- a/application/src/test/java/org/thingsboard/server/service/integration/IntegrationActivityManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/integration/IntegrationActivityManagerTest.java @@ -70,8 +70,10 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -259,6 +261,387 @@ public class IntegrationActivityManagerTest { verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(secondActivityTime), any()); } + @Test + void givenUpdatedStateIsNullAndHasUnreportedEvent_whenOnReportingPeriodEnd_thenShouldRemoveStateAndReportEvent() { + // GIVEN + ConcurrentMap states = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(integrationServiceMock, "states", states); + + var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID); + ActivityStrategy strategyMock = mock(ActivityStrategy.class); + + // first reported event + when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>()); + when(strategyMock.onActivity()).thenReturn(true); + when(integrationServiceMock.getStrategy()).thenReturn(strategyMock); + + long firstActivityTime = 123L; + doCallRealMethod().when(integrationServiceMock).onActivity(key, firstActivityTime); + integrationServiceMock.onActivity(key, firstActivityTime); + + ArgumentCaptor> firstCallbackCaptor = ArgumentCaptor.forClass(ActivityReportCallback.class); + verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(firstActivityTime), firstCallbackCaptor.capture()); + firstCallbackCaptor.getValue().onSuccess(key, firstActivityTime); + + // second unreported event + long secondActivityTime = 456L; + when(strategyMock.onActivity()).thenReturn(false); + doCallRealMethod().when(integrationServiceMock).onActivity(key, secondActivityTime); + integrationServiceMock.onActivity(key, secondActivityTime); + + verify(integrationServiceMock, never()).reportActivity(eq(key), isNull(), eq(secondActivityTime), any()); + + verify(strategyMock, times(2)).onActivity(); + + long lastRecordedTime = secondActivityTime; + // lastReportedTime = firstActivityTime = 123L; + ActivityState updatedState = null; + + when(integrationServiceMock.updateState(eq(key), any())).thenReturn(updatedState); + + // WHEN + doCallRealMethod().when(integrationServiceMock).onReportingPeriodEnd(); + integrationServiceMock.onReportingPeriodEnd(); + + // THEN + assertThat(states).isEmpty(); + verify(integrationServiceMock, never()).hasExpired(anyLong()); + verifyNoMoreInteractions(strategyMock); + verify(integrationServiceMock, never()).onStateExpiry(any(), any()); + verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(lastRecordedTime), any()); + } + + @Test + void givenUpdatedStateIsNullAndDoesntHaveUnreportedEvent_whenOnReportingPeriodEnd_thenShouldRemoveStateAndShouldNotAnything() { + // GIVEN + ConcurrentMap states = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(integrationServiceMock, "states", states); + + var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID); + ActivityStrategy strategyMock = mock(ActivityStrategy.class); + + // reported event + when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>()); + when(strategyMock.onActivity()).thenReturn(true); + when(integrationServiceMock.getStrategy()).thenReturn(strategyMock); + + long activityTime = 123L; + doCallRealMethod().when(integrationServiceMock).onActivity(key, activityTime); + integrationServiceMock.onActivity(key, activityTime); + + ArgumentCaptor> firstCallbackCaptor = ArgumentCaptor.forClass(ActivityReportCallback.class); + verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(activityTime), firstCallbackCaptor.capture()); + firstCallbackCaptor.getValue().onSuccess(key, activityTime); + + verify(strategyMock).onActivity(); + + long lastRecordedTime = activityTime; + // lastReportedTime = lastRecordedTime = 123L; + ActivityState updatedState = null; + + when(integrationServiceMock.updateState(eq(key), any())).thenReturn(updatedState); + + // WHEN + doCallRealMethod().when(integrationServiceMock).onReportingPeriodEnd(); + integrationServiceMock.onReportingPeriodEnd(); + + // THEN + assertThat(states).isEmpty(); + verify(integrationServiceMock, never()).hasExpired(anyLong()); + verifyNoMoreInteractions(strategyMock); + verify(integrationServiceMock, never()).onStateExpiry(any(), any()); + verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(lastRecordedTime), any()); + } + + @Test + void givenUpdatedStateIsNotNullAndHasNotExpiredAndStrategySaysThatShouldReportAndHasUnreportedEvents_whenOnReportingPeriodEnd_thenShouldReportEventUsingUpdatedState() { + // GIVEN + ConcurrentMap states = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(integrationServiceMock, "states", states); + + var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID); + ActivityStrategy strategyMock = mock(ActivityStrategy.class); + + // first reported event + when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>()); + when(strategyMock.onActivity()).thenReturn(true); + when(integrationServiceMock.getStrategy()).thenReturn(strategyMock); + + long firstActivityTime = 123L; + doCallRealMethod().when(integrationServiceMock).onActivity(key, firstActivityTime); + integrationServiceMock.onActivity(key, firstActivityTime); + + ArgumentCaptor> firstCallbackCaptor = ArgumentCaptor.forClass(ActivityReportCallback.class); + verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(firstActivityTime), firstCallbackCaptor.capture()); + firstCallbackCaptor.getValue().onSuccess(key, firstActivityTime); + + // second unreported event + long secondActivityTime = 456L; + when(strategyMock.onActivity()).thenReturn(false); + doCallRealMethod().when(integrationServiceMock).onActivity(key, secondActivityTime); + integrationServiceMock.onActivity(key, secondActivityTime); + + verify(integrationServiceMock, never()).reportActivity(eq(key), isNull(), eq(secondActivityTime), any()); + + verify(strategyMock, times(2)).onActivity(); + + long updatedLastRecordedTime = 500L; + // lastReportedTime = firstActivityTime = 123L; + ActivityState updatedState = new ActivityState<>(); + updatedState.setLastRecordedTime(updatedLastRecordedTime); + + when(integrationServiceMock.updateState(eq(key), any())).thenReturn(updatedState); + when(integrationServiceMock.hasExpired(updatedLastRecordedTime)).thenReturn(false); + when(strategyMock.onReportingPeriodEnd()).thenReturn(true); + + // WHEN + doCallRealMethod().when(integrationServiceMock).onReportingPeriodEnd(); + integrationServiceMock.onReportingPeriodEnd(); + + // THEN + assertThat(states).containsKey(key); + + when(integrationServiceMock.getLastRecordedTime(key)).thenCallRealMethod(); + assertThat(integrationServiceMock.getLastRecordedTime(key)).isEqualTo(updatedLastRecordedTime); + + verify(integrationServiceMock, never()).onStateExpiry(any(), any()); + + verify(strategyMock).onReportingPeriodEnd(); + verifyNoMoreInteractions(strategyMock); + + verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(updatedLastRecordedTime), any()); + } + + @Test + void givenUpdatedStateIsNotNullAndHasNotExpiredAndStrategySaysThatShouldReportAndDoesNotHaveUnreportedEvents_whenOnReportingPeriodEnd_thenShouldNotReportAnything() { + // GIVEN + ConcurrentMap states = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(integrationServiceMock, "states", states); + + var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID); + ActivityStrategy strategyMock = mock(ActivityStrategy.class); + + // first reported event + when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>()); + when(strategyMock.onActivity()).thenReturn(true); + when(integrationServiceMock.getStrategy()).thenReturn(strategyMock); + + long activityTime = 123L; + doCallRealMethod().when(integrationServiceMock).onActivity(key, activityTime); + integrationServiceMock.onActivity(key, activityTime); + + ArgumentCaptor> firstCallbackCaptor = ArgumentCaptor.forClass(ActivityReportCallback.class); + verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(activityTime), firstCallbackCaptor.capture()); + firstCallbackCaptor.getValue().onSuccess(key, activityTime); + + verify(strategyMock).onActivity(); + + long updatedLastRecordedTime = 123L; + // lastReportedTime = firstActivityTime = 123L; + ActivityState updatedState = new ActivityState<>(); + updatedState.setLastRecordedTime(updatedLastRecordedTime); + + when(integrationServiceMock.updateState(eq(key), any())).thenReturn(updatedState); + when(integrationServiceMock.hasExpired(updatedLastRecordedTime)).thenReturn(false); + when(strategyMock.onReportingPeriodEnd()).thenReturn(true); + + // WHEN + doCallRealMethod().when(integrationServiceMock).onReportingPeriodEnd(); + integrationServiceMock.onReportingPeriodEnd(); + + // THEN + assertThat(states).containsKey(key); + + when(integrationServiceMock.getLastRecordedTime(key)).thenCallRealMethod(); + assertThat(integrationServiceMock.getLastRecordedTime(key)).isEqualTo(updatedLastRecordedTime); + + verify(integrationServiceMock, never()).onStateExpiry(any(), any()); + + verify(strategyMock).onReportingPeriodEnd(); + verifyNoMoreInteractions(strategyMock); + + verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(activityTime), any()); + } + + @Test + void givenUpdatedStateIsNotNullAndHasNotExpiredAndStrategySaysThatShouldNotReportAndHasUnreportedEvents_whenOnReportingPeriodEnd_thenShouldNotReportAnything() { + // GIVEN + ConcurrentMap states = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(integrationServiceMock, "states", states); + + var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID); + ActivityStrategy strategyMock = mock(ActivityStrategy.class); + + // first reported event + when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>()); + when(strategyMock.onActivity()).thenReturn(true); + when(integrationServiceMock.getStrategy()).thenReturn(strategyMock); + + long firstActivityTime = 123L; + doCallRealMethod().when(integrationServiceMock).onActivity(key, firstActivityTime); + integrationServiceMock.onActivity(key, firstActivityTime); + + ArgumentCaptor> firstCallbackCaptor = ArgumentCaptor.forClass(ActivityReportCallback.class); + verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(firstActivityTime), firstCallbackCaptor.capture()); + firstCallbackCaptor.getValue().onSuccess(key, firstActivityTime); + + // second unreported event + long secondActivityTime = 456L; + when(strategyMock.onActivity()).thenReturn(false); + doCallRealMethod().when(integrationServiceMock).onActivity(key, secondActivityTime); + integrationServiceMock.onActivity(key, secondActivityTime); + + verify(integrationServiceMock, never()).reportActivity(eq(key), isNull(), eq(secondActivityTime), any()); + + verify(strategyMock, times(2)).onActivity(); + + long updatedLastRecordedTime = 500L; + // lastReportedTime = firstActivityTime = 123L; + ActivityState updatedState = new ActivityState<>(); + updatedState.setLastRecordedTime(updatedLastRecordedTime); + + when(integrationServiceMock.updateState(eq(key), any())).thenReturn(updatedState); + when(integrationServiceMock.hasExpired(updatedLastRecordedTime)).thenReturn(false); + when(strategyMock.onReportingPeriodEnd()).thenReturn(false); + + // WHEN + doCallRealMethod().when(integrationServiceMock).onReportingPeriodEnd(); + integrationServiceMock.onReportingPeriodEnd(); + + // THEN + assertThat(states).containsKey(key); + + when(integrationServiceMock.getLastRecordedTime(key)).thenCallRealMethod(); + assertThat(integrationServiceMock.getLastRecordedTime(key)).isEqualTo(updatedLastRecordedTime); + + verify(integrationServiceMock, never()).onStateExpiry(any(), any()); + + verify(strategyMock).onReportingPeriodEnd(); + verifyNoMoreInteractions(strategyMock); + + verify(integrationServiceMock, never()).reportActivity(eq(key), isNull(), eq(updatedLastRecordedTime), any()); + } + + @Test + void givenUpdatedStateIsNotNullAndHasExpiredAndStrategySaysThatShouldReportAndHasUnreportedEvents_whenOnReportingPeriodEnd_thenShouldReportEventUsingUpdatedState() { + // GIVEN + ConcurrentMap states = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(integrationServiceMock, "states", states); + + var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID); + ActivityStrategy strategyMock = mock(ActivityStrategy.class); + + // first reported event + when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>()); + when(strategyMock.onActivity()).thenReturn(true); + when(integrationServiceMock.getStrategy()).thenReturn(strategyMock); + + long firstActivityTime = 123L; + doCallRealMethod().when(integrationServiceMock).onActivity(key, firstActivityTime); + integrationServiceMock.onActivity(key, firstActivityTime); + + ArgumentCaptor> firstCallbackCaptor = ArgumentCaptor.forClass(ActivityReportCallback.class); + verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(firstActivityTime), firstCallbackCaptor.capture()); + firstCallbackCaptor.getValue().onSuccess(key, firstActivityTime); + + // second unreported event + long secondActivityTime = 456L; + when(strategyMock.onActivity()).thenReturn(false); + doCallRealMethod().when(integrationServiceMock).onActivity(key, secondActivityTime); + integrationServiceMock.onActivity(key, secondActivityTime); + + verify(integrationServiceMock, never()).reportActivity(eq(key), isNull(), eq(secondActivityTime), any()); + + verify(strategyMock, times(2)).onActivity(); + + long updatedLastRecordedTime = 500L; + // lastReportedTime = firstActivityTime = 123L; + ActivityState updatedState = new ActivityState<>(); + updatedState.setLastRecordedTime(updatedLastRecordedTime); + + when(integrationServiceMock.updateState(eq(key), any())).thenReturn(updatedState); + when(integrationServiceMock.hasExpired(updatedLastRecordedTime)).thenReturn(true); + when(strategyMock.onReportingPeriodEnd()).thenReturn(true); + + // WHEN + doCallRealMethod().when(integrationServiceMock).onReportingPeriodEnd(); + integrationServiceMock.onReportingPeriodEnd(); + + // THEN + assertThat(states).isEmpty(); + + verify(integrationServiceMock).onStateExpiry(key, null); + + when(integrationServiceMock.getLastRecordedTime(key)).thenCallRealMethod(); + assertThat(integrationServiceMock.getLastRecordedTime(key)).isZero(); + + verify(strategyMock).onReportingPeriodEnd(); + verifyNoMoreInteractions(strategyMock); + + verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(updatedLastRecordedTime), any()); + } + + @Test + void givenUpdatedStateIsNotNullAndHasExpiredAndStrategySaysThatShouldNotReportAndHasUnreportedEvents_whenOnReportingPeriodEnd_thenShouldReportEventUsingUpdatedState() { + // GIVEN + ConcurrentMap states = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(integrationServiceMock, "states", states); + + var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID); + ActivityStrategy strategyMock = mock(ActivityStrategy.class); + + // first reported event + when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>()); + when(strategyMock.onActivity()).thenReturn(true); + when(integrationServiceMock.getStrategy()).thenReturn(strategyMock); + + long firstActivityTime = 123L; + doCallRealMethod().when(integrationServiceMock).onActivity(key, firstActivityTime); + integrationServiceMock.onActivity(key, firstActivityTime); + + ArgumentCaptor> firstCallbackCaptor = ArgumentCaptor.forClass(ActivityReportCallback.class); + verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(firstActivityTime), firstCallbackCaptor.capture()); + firstCallbackCaptor.getValue().onSuccess(key, firstActivityTime); + + // second unreported event + long secondActivityTime = 456L; + when(strategyMock.onActivity()).thenReturn(false); + doCallRealMethod().when(integrationServiceMock).onActivity(key, secondActivityTime); + integrationServiceMock.onActivity(key, secondActivityTime); + + verify(integrationServiceMock, never()).reportActivity(eq(key), isNull(), eq(secondActivityTime), any()); + + verify(strategyMock, times(2)).onActivity(); + + long updatedLastRecordedTime = 500L; + // lastReportedTime = firstActivityTime = 123L; + ActivityState updatedState = new ActivityState<>(); + updatedState.setLastRecordedTime(updatedLastRecordedTime); + + when(integrationServiceMock.updateState(eq(key), any())).thenReturn(updatedState); + when(integrationServiceMock.hasExpired(updatedLastRecordedTime)).thenReturn(true); + when(strategyMock.onReportingPeriodEnd()).thenReturn(false); + + // WHEN + doCallRealMethod().when(integrationServiceMock).onReportingPeriodEnd(); + integrationServiceMock.onReportingPeriodEnd(); + + // THEN + assertThat(states).isEmpty(); + + verify(integrationServiceMock).onStateExpiry(key, null); + + when(integrationServiceMock.getLastRecordedTime(key)).thenCallRealMethod(); + assertThat(integrationServiceMock.getLastRecordedTime(key)).isZero(); + + verify(strategyMock).onReportingPeriodEnd(); + verifyNoMoreInteractions(strategyMock); + + verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(updatedLastRecordedTime), any()); + } + + // TODO: onReportingPeriodEnd() test for updating reported time + @Test void givenKeyAndTimeToReport_whenReportingActivity_thenShouldCorrectlyReportActivity() { // GIVEN diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java index 80ab8d8665..e9ed4cdcea 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java @@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; @Slf4j public abstract class AbstractActivityManager implements ActivityManager { @@ -91,6 +92,7 @@ public abstract class AbstractActivityManager implements Activity var shouldReport = new AtomicBoolean(false); var lastRecordedTime = new AtomicLong(); var lastReportedTime = new AtomicLong(); + var metadata = new AtomicReference(); var activityStateWrapper = states.compute(key, (__, stateWrapper) -> { if (stateWrapper == null) { @@ -109,6 +111,7 @@ public abstract class AbstractActivityManager implements Activity shouldReport.set(stateWrapper.getStrategy().onActivity()); lastRecordedTime.set(state.getLastRecordedTime()); lastReportedTime.set(stateWrapper.getLastReportedTime()); + metadata.set(state.getMetadata()); return stateWrapper; }); @@ -118,7 +121,7 @@ public abstract class AbstractActivityManager implements Activity if (shouldReport.get() && lastReportedTime.get() < lastRecordedTime.get()) { log.debug("Going to report first activity event for key: [{}].", key); - reportActivity(key, activityStateWrapper.getState().getMetadata(), lastRecordedTime.get(), new ActivityReportCallback<>() { + reportActivity(key, metadata.get(), lastRecordedTime.get(), new ActivityReportCallback<>() { @Override public void onSuccess(Key key, long reportedTime) { updateLastReportedTime(key, reportedTime); @@ -149,6 +152,7 @@ public abstract class AbstractActivityManager implements Activity var updatedState = updateState(key, currentState); if (updatedState != null) { + stateWrapper.setState(updatedState); lastRecordedTime = updatedState.getLastRecordedTime(); metadata = updatedState.getMetadata(); hasExpired = hasExpired(lastRecordedTime);