Merge pull request #10013 from dskarzh/fix/activity-manager/new-state-creation
Fix new activity state not being created correctly
This commit is contained in:
commit
6c6b606fdf
@ -28,10 +28,9 @@ import java.util.concurrent.ConcurrentMap;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public abstract class AbstractActivityManager<Key, Metadata> implements ActivityManager<Key> {
|
public abstract class AbstractActivityManager<Key, Metadata> implements ActivityManager<Key, Metadata> {
|
||||||
|
|
||||||
private final ConcurrentMap<Key, ActivityStateWrapper> states = new ConcurrentHashMap<>();
|
private final ConcurrentMap<Key, ActivityStateWrapper> states = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
@ -54,8 +53,6 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
|
|||||||
|
|
||||||
protected abstract long getReportingPeriodMillis();
|
protected abstract long getReportingPeriodMillis();
|
||||||
|
|
||||||
protected abstract ActivityState<Metadata> createNewState(Key key);
|
|
||||||
|
|
||||||
protected abstract ActivityStrategy getStrategy();
|
protected abstract ActivityStrategy getStrategy();
|
||||||
|
|
||||||
protected abstract ActivityState<Metadata> updateState(Key key, ActivityState<Metadata> state);
|
protected abstract ActivityState<Metadata> updateState(Key key, ActivityState<Metadata> state);
|
||||||
@ -67,7 +64,7 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
|
|||||||
protected abstract void reportActivity(Key key, Metadata metadata, long timeToReport, ActivityReportCallback<Key> callback);
|
protected abstract void reportActivity(Key key, Metadata metadata, long timeToReport, ActivityReportCallback<Key> callback);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onActivity(Key key, long newLastRecordedTime) {
|
public void onActivity(Key key, Metadata metadata, long newLastRecordedTime) {
|
||||||
if (key == null) {
|
if (key == null) {
|
||||||
log.error("Failed to process activity event: provided activity key is null.");
|
log.error("Failed to process activity event: provided activity key is null.");
|
||||||
return;
|
return;
|
||||||
@ -77,36 +74,28 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
|
|||||||
var shouldReport = new AtomicBoolean(false);
|
var shouldReport = new AtomicBoolean(false);
|
||||||
var lastRecordedTime = new AtomicLong();
|
var lastRecordedTime = new AtomicLong();
|
||||||
var lastReportedTime = new AtomicLong();
|
var lastReportedTime = new AtomicLong();
|
||||||
var metadata = new AtomicReference<Metadata>();
|
|
||||||
|
|
||||||
var activityStateWrapper = states.compute(key, (__, stateWrapper) -> {
|
states.compute(key, (__, stateWrapper) -> {
|
||||||
if (stateWrapper == null) {
|
if (stateWrapper == null) {
|
||||||
var newState = createNewState(key);
|
ActivityState<Metadata> newState = new ActivityState<>();
|
||||||
if (newState == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
stateWrapper = new ActivityStateWrapper();
|
stateWrapper = new ActivityStateWrapper();
|
||||||
stateWrapper.setState(newState);
|
stateWrapper.setState(newState);
|
||||||
stateWrapper.setStrategy(getStrategy());
|
stateWrapper.setStrategy(getStrategy());
|
||||||
}
|
}
|
||||||
var state = stateWrapper.getState();
|
var state = stateWrapper.getState();
|
||||||
|
state.setMetadata(metadata);
|
||||||
if (state.getLastRecordedTime() < newLastRecordedTime) {
|
if (state.getLastRecordedTime() < newLastRecordedTime) {
|
||||||
state.setLastRecordedTime(newLastRecordedTime);
|
state.setLastRecordedTime(newLastRecordedTime);
|
||||||
}
|
}
|
||||||
shouldReport.set(stateWrapper.getStrategy().onActivity());
|
shouldReport.set(stateWrapper.getStrategy().onActivity());
|
||||||
lastRecordedTime.set(state.getLastRecordedTime());
|
lastRecordedTime.set(state.getLastRecordedTime());
|
||||||
lastReportedTime.set(stateWrapper.getLastReportedTime());
|
lastReportedTime.set(stateWrapper.getLastReportedTime());
|
||||||
metadata.set(state.getMetadata());
|
|
||||||
return stateWrapper;
|
return stateWrapper;
|
||||||
});
|
});
|
||||||
|
|
||||||
if (activityStateWrapper == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (shouldReport.get() && lastReportedTime.get() < lastRecordedTime.get()) {
|
if (shouldReport.get() && lastReportedTime.get() < lastRecordedTime.get()) {
|
||||||
log.debug("Going to report first activity event for key: [{}].", key);
|
log.debug("Going to report first activity event for key: [{}].", key);
|
||||||
reportActivity(key, metadata.get(), lastRecordedTime.get(), new ActivityReportCallback<>() {
|
reportActivity(key, metadata, lastRecordedTime.get(), new ActivityReportCallback<>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(Key key, long reportedTime) {
|
public void onSuccess(Key key, long reportedTime) {
|
||||||
updateLastReportedTime(key, reportedTime);
|
updateLastReportedTime(key, reportedTime);
|
||||||
|
|||||||
@ -15,9 +15,9 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.common.transport.activity;
|
package org.thingsboard.server.common.transport.activity;
|
||||||
|
|
||||||
public interface ActivityManager<Key> {
|
public interface ActivityManager<Key, Metadata> {
|
||||||
|
|
||||||
void onActivity(Key key, long activityTimeMillis);
|
void onActivity(Key key, Metadata metadata, long activityTimeMillis);
|
||||||
|
|
||||||
void onReportingPeriodEnd();
|
void onReportingPeriodEnd();
|
||||||
|
|
||||||
|
|||||||
@ -96,8 +96,6 @@ import org.thingsboard.server.queue.TbQueueProducer;
|
|||||||
import org.thingsboard.server.queue.TbQueueRequestTemplate;
|
import org.thingsboard.server.queue.TbQueueRequestTemplate;
|
||||||
import org.thingsboard.server.queue.common.AsyncCallbackTemplate;
|
import org.thingsboard.server.queue.common.AsyncCallbackTemplate;
|
||||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||||
import org.thingsboard.server.queue.discovery.QueueKey;
|
|
||||||
import org.thingsboard.server.queue.discovery.TopicService;
|
|
||||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||||
import org.thingsboard.server.queue.discovery.TopicService;
|
import org.thingsboard.server.queue.discovery.TopicService;
|
||||||
@ -774,7 +772,7 @@ public class DefaultTransportService extends TransportActivityManager implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void recordActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
|
private void recordActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
|
||||||
onActivity(toSessionId(sessionInfo), getCurrentTimeMillis());
|
onActivity(toSessionId(sessionInfo), sessionInfo, getCurrentTimeMillis());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -57,17 +57,6 @@ public abstract class TransportActivityManager extends AbstractActivityManager<U
|
|||||||
return sessionReportTimeout;
|
return sessionReportTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ActivityState<TransportProtos.SessionInfoProto> createNewState(UUID sessionId) {
|
|
||||||
SessionMetaData session = sessions.get(sessionId);
|
|
||||||
if (session == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
ActivityState<TransportProtos.SessionInfoProto> state = new ActivityState<>();
|
|
||||||
state.setMetadata(session.getSessionInfo());
|
|
||||||
return state;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ActivityStrategy getStrategy() {
|
protected ActivityStrategy getStrategy() {
|
||||||
return reportingStrategyType.toStrategy();
|
return reportingStrategyType.toStrategy();
|
||||||
|
|||||||
@ -40,6 +40,8 @@ import java.util.concurrent.ConcurrentMap;
|
|||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.Mockito.doCallRealMethod;
|
import static org.mockito.Mockito.doCallRealMethod;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
@ -63,6 +65,34 @@ public class TransportActivityManagerTest {
|
|||||||
ReflectionTestUtils.setField(transportServiceMock, "sessions", sessions);
|
ReflectionTestUtils.setField(transportServiceMock, "sessions", sessions);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenFirstActivityForAlreadyRemovedSessionAndFirstEventReportingStrategy_whenOnActivity_thenShouldRecordActivityAndReport() {
|
||||||
|
// GIVEN
|
||||||
|
ConcurrentMap<UUID, Object> states = new ConcurrentHashMap<>();
|
||||||
|
ReflectionTestUtils.setField(transportServiceMock, "states", states);
|
||||||
|
|
||||||
|
var strategyMock = mock(ActivityStrategy.class);
|
||||||
|
when(transportServiceMock.getStrategy()).thenReturn(strategyMock);
|
||||||
|
when(strategyMock.onActivity()).thenReturn(true);
|
||||||
|
|
||||||
|
long activityTime = 123L;
|
||||||
|
var sessionInfo = TransportProtos.SessionInfoProto.newBuilder()
|
||||||
|
.setSessionIdMSB(SESSION_ID.getMostSignificantBits())
|
||||||
|
.setSessionIdLSB(SESSION_ID.getLeastSignificantBits())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
doCallRealMethod().when(transportServiceMock).getLastRecordedTime(SESSION_ID);
|
||||||
|
doCallRealMethod().when(transportServiceMock).onActivity(SESSION_ID, sessionInfo, activityTime);
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
transportServiceMock.onActivity(SESSION_ID, sessionInfo, activityTime);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
assertThat(states).containsKey(SESSION_ID);
|
||||||
|
assertThat(transportServiceMock.getLastRecordedTime(SESSION_ID)).isEqualTo(activityTime);
|
||||||
|
verify(transportServiceMock).reportActivity(eq(SESSION_ID), eq(sessionInfo), eq(activityTime), any(ActivityReportCallback.class));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void givenKeyAndTimeToReportAndSessionExists_whenReportingActivity_thenShouldReportActivityWithSubscriptionsAndSessionInfoFromSession() {
|
void givenKeyAndTimeToReportAndSessionExists_whenReportingActivity_thenShouldReportActivityWithSubscriptionsAndSessionInfoFromSession() {
|
||||||
// GIVEN
|
// GIVEN
|
||||||
@ -175,28 +205,7 @@ public class TransportActivityManagerTest {
|
|||||||
transportServiceMock.recordActivity(sessionInfo);
|
transportServiceMock.recordActivity(sessionInfo);
|
||||||
|
|
||||||
// THEN
|
// THEN
|
||||||
verify(transportServiceMock).onActivity(SESSION_ID, expectedTime);
|
verify(transportServiceMock).onActivity(SESSION_ID, sessionInfo, expectedTime);
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
void givenKey_whenCreatingNewState_thenShouldCorrectlyCreateNewEmptyState() {
|
|
||||||
// GIVEN
|
|
||||||
TransportProtos.SessionInfoProto sessionInfo = TransportProtos.SessionInfoProto.newBuilder()
|
|
||||||
.setSessionIdMSB(SESSION_ID.getMostSignificantBits())
|
|
||||||
.setSessionIdLSB(SESSION_ID.getLeastSignificantBits())
|
|
||||||
.build();
|
|
||||||
sessions.put(SESSION_ID, new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, null));
|
|
||||||
|
|
||||||
when(transportServiceMock.createNewState(SESSION_ID)).thenCallRealMethod();
|
|
||||||
|
|
||||||
ActivityState<TransportProtos.SessionInfoProto> expectedState = new ActivityState<>();
|
|
||||||
expectedState.setMetadata(sessionInfo);
|
|
||||||
|
|
||||||
// WHEN
|
|
||||||
ActivityState<TransportProtos.SessionInfoProto> actualState = transportServiceMock.createNewState(SESSION_ID);
|
|
||||||
|
|
||||||
// THEN
|
|
||||||
assertThat(actualState).isEqualTo(expectedState);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user