From a31bd5e2c59dc8a37318f619c0d0848b5aafd9d4 Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Mon, 18 Dec 2023 13:40:32 +0200 Subject: [PATCH] Add tests for onActivity() in integration activity manager, add concurrency fix in onActivity() --- .../IntegrationActivityManagerTest.java | 190 +++++++++++++++++- .../activity/AbstractActivityManager.java | 29 +-- .../transport/activity/ActivityManager.java | 4 +- .../service/DefaultTransportService.java | 2 +- .../service/TransportActivityManagerTest.java | 4 +- 5 files changed, 211 insertions(+), 18 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/service/integration/IntegrationActivityManagerTest.java b/application/src/test/java/org/thingsboard/server/service/integration/IntegrationActivityManagerTest.java index 3b85261bac..c63b267fbf 100644 --- a/application/src/test/java/org/thingsboard/server/service/integration/IntegrationActivityManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/integration/IntegrationActivityManagerTest.java @@ -55,15 +55,20 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -78,6 +83,182 @@ public class IntegrationActivityManagerTest { @Mock private DefaultPlatformIntegrationService integrationServiceMock; + @Test + void givenKeyIsNull_whenOnActivity_thenShouldNotRecordAndShouldNotReport() { + // GIVEN + ConcurrentMap states = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(integrationServiceMock, "states", states); + + IntegrationActivityKey key = null; + + doCallRealMethod().when(integrationServiceMock).onActivity(eq(key), anyLong()); + + // WHEN + integrationServiceMock.onActivity(key, 123L); + + // THEN + assertThat(states.isEmpty()).isTrue(); + verify(integrationServiceMock, never()).createNewState(any()); + verify(integrationServiceMock, never()).getStrategy(); + verify(integrationServiceMock, never()).reportActivity(any(), any(), anyLong(), any()); + } + + @Test + void givenNewActivity_whenOnActivity_thenShouldCreateNewStateAndRecord() { + // GIVEN + ConcurrentMap states = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(integrationServiceMock, "states", states); + + ActivityStrategy strategyMock = mock(ActivityStrategy.class); + when(integrationServiceMock.getStrategy()).thenReturn(strategyMock); + + var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID); + when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>()); + + long activityTime = 123L; + doCallRealMethod().when(integrationServiceMock).onActivity(key, activityTime); + + when(integrationServiceMock.getLastRecordedTime(key)).thenCallRealMethod(); + + // WHEN + integrationServiceMock.onActivity(key, activityTime); + + // THEN + assertThat(integrationServiceMock.getLastRecordedTime(key)).isEqualTo(activityTime); + verify(integrationServiceMock).createNewState(key); + verify(integrationServiceMock).getStrategy(); + } + + @Test + void givenSubsequentActivities_whenOnActivity_thenShouldRecordBoth() { + // GIVEN + ConcurrentMap states = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(integrationServiceMock, "states", states); + + ActivityStrategy strategyMock = mock(ActivityStrategy.class); + when(integrationServiceMock.getStrategy()).thenReturn(strategyMock); + + var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID); + when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>()); + + when(integrationServiceMock.getLastRecordedTime(key)).thenCallRealMethod(); + + // WHEN-THEN + long firstActivityTime = 100L; + doCallRealMethod().when(integrationServiceMock).onActivity(key, firstActivityTime); + integrationServiceMock.onActivity(key, firstActivityTime); + assertThat(integrationServiceMock.getLastRecordedTime(key)).isEqualTo(firstActivityTime); + + long secondActivityTime = 123L; + doCallRealMethod().when(integrationServiceMock).onActivity(key, secondActivityTime); + integrationServiceMock.onActivity(key, secondActivityTime); + assertThat(integrationServiceMock.getLastRecordedTime(key)).isEqualTo(secondActivityTime); + } + + @Test + void givenActivityAndStrategySaysThatShouldNotReport_whenOnActivity_thenShouldNotReport() { + // GIVEN + ConcurrentMap states = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(integrationServiceMock, "states", states); + + ActivityStrategy strategyMock = mock(ActivityStrategy.class); + when(strategyMock.onActivity()).thenReturn(false); + + var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID); + when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>()); + when(integrationServiceMock.getStrategy()).thenReturn(strategyMock); + + long activityTime = 123L; + doCallRealMethod().when(integrationServiceMock).onActivity(key, activityTime); + + // WHEN + integrationServiceMock.onActivity(key, activityTime); + + // THEN + verify(integrationServiceMock, never()).reportActivity(any(), any(), anyLong(), any()); + } + + @Test + void givenActivityAndStrategySaysThatShouldReport_whenOnActivity_thenShouldReport() { + // GIVEN + ConcurrentMap states = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(integrationServiceMock, "states", states); + + ActivityStrategy strategyMock = mock(ActivityStrategy.class); + when(strategyMock.onActivity()).thenReturn(true); + + var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID); + when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>()); + when(integrationServiceMock.getStrategy()).thenReturn(strategyMock); + + long activityTime = 123L; + doCallRealMethod().when(integrationServiceMock).onActivity(key, activityTime); + + // WHEN + integrationServiceMock.onActivity(key, activityTime); + + // THEN + verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(activityTime), any()); + } + + @Test + void givenActivityAndStrategySaysThatShouldReportButLastRecordedTimeIsEqualToLastReportedTime_whenOnActivity_thenShouldNotReport() { + // GIVEN + ConcurrentMap states = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(integrationServiceMock, "states", states); + + ActivityStrategy strategyMock = mock(ActivityStrategy.class); + when(strategyMock.onActivity()).thenReturn(true); + + var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID); + when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>()); + when(integrationServiceMock.getStrategy()).thenReturn(strategyMock); + + // WHEN-THEN + long firstActivityTime = 123L; + doCallRealMethod().when(integrationServiceMock).onActivity(key, firstActivityTime); + integrationServiceMock.onActivity(key, firstActivityTime); + + ArgumentCaptor> firstCallbackCaptor = ArgumentCaptor.forClass(ActivityReportCallback.class); + verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(firstActivityTime), firstCallbackCaptor.capture()); + firstCallbackCaptor.getValue().onSuccess(key, firstActivityTime); + + long secondActivityTime = 100L; + doCallRealMethod().when(integrationServiceMock).onActivity(key, secondActivityTime); + integrationServiceMock.onActivity(key, secondActivityTime); + + verify(integrationServiceMock, never()).reportActivity(eq(key), any(), eq(secondActivityTime), any()); + } + + @Test + void givenActivityAndStrategySaysThatShouldReportAndLastRecordedTimeIsGreaterThanLastReportedTime_whenOnActivity_thenShouldReport() { + // GIVEN + ConcurrentMap states = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(integrationServiceMock, "states", states); + + ActivityStrategy strategyMock = mock(ActivityStrategy.class); + when(strategyMock.onActivity()).thenReturn(true); + + var key = new IntegrationActivityKey(TENANT_ID, DEVICE_ID); + when(integrationServiceMock.createNewState(key)).thenReturn(new ActivityState<>()); + when(integrationServiceMock.getStrategy()).thenReturn(strategyMock); + + // WHEN-THEN + long firstActivityTime = 123L; + doCallRealMethod().when(integrationServiceMock).onActivity(key, firstActivityTime); + integrationServiceMock.onActivity(key, firstActivityTime); + + ArgumentCaptor> firstCallbackCaptor = ArgumentCaptor.forClass(ActivityReportCallback.class); + verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(firstActivityTime), firstCallbackCaptor.capture()); + firstCallbackCaptor.getValue().onSuccess(key, firstActivityTime); + + long secondActivityTime = 456L; + doCallRealMethod().when(integrationServiceMock).onActivity(key, secondActivityTime); + integrationServiceMock.onActivity(key, secondActivityTime); + + verify(integrationServiceMock).reportActivity(eq(key), isNull(), eq(secondActivityTime), any()); + } + @Test void givenKeyAndTimeToReport_whenReportingActivity_thenShouldCorrectlyReportActivity() { // GIVEN @@ -148,12 +329,14 @@ public class IntegrationActivityManagerTest { .build(); TransportProtos.PostTelemetryMsg postTelemetryMsg = TransportProtos.PostTelemetryMsg.getDefaultInstance(); doCallRealMethod().when(integrationServiceMock).process(sessionInfo, postTelemetryMsg, null); + long expectedTime = 123L; + when(integrationServiceMock.getCurrentTimeMillis()).thenReturn(expectedTime); // WHEN integrationServiceMock.process(sessionInfo, postTelemetryMsg, null); // THEN - verify(integrationServiceMock).onActivity(key); + verify(integrationServiceMock).onActivity(key, expectedTime); } @Test @@ -172,11 +355,14 @@ public class IntegrationActivityManagerTest { doCallRealMethod().when(integrationServiceMock).process(sessionInfo, postAttributeMsg, null); doNothing().when(integrationServiceMock).sendToRuleEngine(any(), any(), any(), any(), any(), any(), any()); + long activityTime = 123L; + when(integrationServiceMock.getCurrentTimeMillis()).thenReturn(activityTime); + // WHEN integrationServiceMock.process(sessionInfo, postAttributeMsg, null); // THEN - verify(integrationServiceMock).onActivity(key); + verify(integrationServiceMock).onActivity(key, activityTime); } @Test diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java index 6c41ea70ea..80ab8d8665 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java @@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; @Slf4j public abstract class AbstractActivityManager implements ActivityManager { @@ -80,15 +81,17 @@ public abstract class AbstractActivityManager implements Activity protected abstract void reportActivity(Key key, Metadata metadata, long timeToReport, ActivityReportCallback callback); @Override - public void onActivity(Key key) { + public void onActivity(Key key, long newLastRecordedTime) { if (key == null) { log.error("Failed to process activity event: provided activity key is null."); return; } log.debug("Received activity event for key: [{}]", key); - long newLastRecordedTime = System.currentTimeMillis(); var shouldReport = new AtomicBoolean(false); + var lastRecordedTime = new AtomicLong(); + var lastReportedTime = new AtomicLong(); + var activityStateWrapper = states.compute(key, (__, stateWrapper) -> { if (stateWrapper == null) { var newState = createNewState(key); @@ -104,6 +107,8 @@ public abstract class AbstractActivityManager implements Activity state.setLastRecordedTime(newLastRecordedTime); } shouldReport.set(stateWrapper.getStrategy().onActivity()); + lastRecordedTime.set(state.getLastRecordedTime()); + lastReportedTime.set(stateWrapper.getLastReportedTime()); return stateWrapper; }); @@ -111,12 +116,9 @@ public abstract class AbstractActivityManager implements Activity return; } - var activityState = activityStateWrapper.getState(); - long lastRecordedTime = activityState.getLastRecordedTime(); - long lastReportedTime = activityStateWrapper.getLastReportedTime(); - if (shouldReport.get() && lastReportedTime < lastRecordedTime) { + if (shouldReport.get() && lastReportedTime.get() < lastRecordedTime.get()) { log.debug("Going to report first activity event for key: [{}].", key); - reportActivity(key, activityState.getMetadata(), lastRecordedTime, new ActivityReportCallback<>() { + reportActivity(key, activityStateWrapper.getState().getMetadata(), lastRecordedTime.get(), new ActivityReportCallback<>() { @Override public void onSuccess(Key key, long reportedTime) { updateLastReportedTime(key, reportedTime); @@ -131,12 +133,7 @@ public abstract class AbstractActivityManager implements Activity } @Override - public long getLastRecordedTime(Key key) { - ActivityStateWrapper stateWrapper = states.get(key); - return stateWrapper == null ? 0L : stateWrapper.getState().getLastRecordedTime(); - } - - private void onReportingPeriodEnd() { + public void onReportingPeriodEnd() { log.debug("Going to end reporting period."); for (Map.Entry entry : states.entrySet()) { var key = entry.getKey(); @@ -185,6 +182,12 @@ public abstract class AbstractActivityManager implements Activity } } + @Override + public long getLastRecordedTime(Key key) { + ActivityStateWrapper stateWrapper = states.get(key); + return stateWrapper == null ? 0L : stateWrapper.getState().getLastRecordedTime(); + } + private void updateLastReportedTime(Key key, long newLastReportedTime) { states.computeIfPresent(key, (__, stateWrapper) -> { stateWrapper.setLastReportedTime(Math.max(stateWrapper.getLastReportedTime(), newLastReportedTime)); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java index f8881d1b61..c55c94cc54 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java @@ -32,7 +32,9 @@ package org.thingsboard.server.common.transport.activity; public interface ActivityManager { - void onActivity(Key key); + void onActivity(Key key, long activityTimeMillis); + + void onReportingPeriodEnd(); long getLastRecordedTime(Key key); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index f733efef67..dc58c79250 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -785,7 +785,7 @@ public class DefaultTransportService extends AbstractActivityManager