Merge pull request #7544 from dmytro-landiak/inactivity-fix
[3.4.2] inactivity timeout fix
This commit is contained in:
		
						commit
						08534d68c5
					
				@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 | 
				
			|||||||
import com.google.common.util.concurrent.ListeningExecutorService;
 | 
					import com.google.common.util.concurrent.ListeningExecutorService;
 | 
				
			||||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
					import com.google.common.util.concurrent.MoreExecutors;
 | 
				
			||||||
import lombok.Getter;
 | 
					import lombok.Getter;
 | 
				
			||||||
 | 
					import lombok.Setter;
 | 
				
			||||||
import lombok.extern.slf4j.Slf4j;
 | 
					import lombok.extern.slf4j.Slf4j;
 | 
				
			||||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
					import org.springframework.beans.factory.annotation.Autowired;
 | 
				
			||||||
import org.springframework.beans.factory.annotation.Value;
 | 
					import org.springframework.beans.factory.annotation.Value;
 | 
				
			||||||
@ -121,7 +122,8 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
 | 
				
			|||||||
            new EntityKey(EntityKeyType.TIME_SERIES, INACTIVITY_TIMEOUT),
 | 
					            new EntityKey(EntityKeyType.TIME_SERIES, INACTIVITY_TIMEOUT),
 | 
				
			||||||
            new EntityKey(EntityKeyType.TIME_SERIES, ACTIVITY_STATE),
 | 
					            new EntityKey(EntityKeyType.TIME_SERIES, ACTIVITY_STATE),
 | 
				
			||||||
            new EntityKey(EntityKeyType.TIME_SERIES, LAST_CONNECT_TIME),
 | 
					            new EntityKey(EntityKeyType.TIME_SERIES, LAST_CONNECT_TIME),
 | 
				
			||||||
            new EntityKey(EntityKeyType.TIME_SERIES, LAST_DISCONNECT_TIME));
 | 
					            new EntityKey(EntityKeyType.TIME_SERIES, LAST_DISCONNECT_TIME),
 | 
				
			||||||
 | 
					            new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private static final List<EntityKey> PERSISTENT_ATTRIBUTE_KEYS = Arrays.asList(
 | 
					    private static final List<EntityKey> PERSISTENT_ATTRIBUTE_KEYS = Arrays.asList(
 | 
				
			||||||
            new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, LAST_ACTIVITY_TIME),
 | 
					            new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, LAST_ACTIVITY_TIME),
 | 
				
			||||||
@ -152,14 +154,21 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    @Value("${state.defaultInactivityTimeoutInSec}")
 | 
					    @Value("${state.defaultInactivityTimeoutInSec}")
 | 
				
			||||||
    @Getter
 | 
					    @Getter
 | 
				
			||||||
 | 
					    @Setter
 | 
				
			||||||
    private long defaultInactivityTimeoutInSec;
 | 
					    private long defaultInactivityTimeoutInSec;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Value("#{${state.defaultInactivityTimeoutInSec} * 1000}")
 | 
				
			||||||
 | 
					    @Getter
 | 
				
			||||||
 | 
					    @Setter
 | 
				
			||||||
 | 
					    private long defaultInactivityTimeoutMs;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Value("${state.defaultStateCheckIntervalInSec}")
 | 
					    @Value("${state.defaultStateCheckIntervalInSec}")
 | 
				
			||||||
    @Getter
 | 
					    @Getter
 | 
				
			||||||
    private int defaultStateCheckIntervalInSec;
 | 
					    private int defaultStateCheckIntervalInSec;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Value("${state.persistToTelemetry:false}")
 | 
					    @Value("${state.persistToTelemetry:false}")
 | 
				
			||||||
    @Getter
 | 
					    @Getter
 | 
				
			||||||
 | 
					    @Setter
 | 
				
			||||||
    private boolean persistToTelemetry;
 | 
					    private boolean persistToTelemetry;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Value("${state.initFetchPackSize:50000}")
 | 
					    @Value("${state.initFetchPackSize:50000}")
 | 
				
			||||||
@ -540,7 +549,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    private ListenableFuture<DeviceStateData> transformInactivityTimeout(ListenableFuture<DeviceStateData> future) {
 | 
					    private ListenableFuture<DeviceStateData> transformInactivityTimeout(ListenableFuture<DeviceStateData> future) {
 | 
				
			||||||
        return Futures.transformAsync(future, deviceStateData -> {
 | 
					        return Futures.transformAsync(future, deviceStateData -> {
 | 
				
			||||||
            if (!persistToTelemetry || deviceStateData.getState().getInactivityTimeout() != TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec)) {
 | 
					            if (!persistToTelemetry || deviceStateData.getState().getInactivityTimeout() != defaultInactivityTimeoutMs) {
 | 
				
			||||||
                return future; //fail fast
 | 
					                return future; //fail fast
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            var attributesFuture = attributesService.find(TenantId.SYS_TENANT_ID, deviceStateData.getDeviceId(), SERVER_SCOPE, INACTIVITY_TIMEOUT);
 | 
					            var attributesFuture = attributesService.find(TenantId.SYS_TENANT_ID, deviceStateData.getDeviceId(), SERVER_SCOPE, INACTIVITY_TIMEOUT);
 | 
				
			||||||
@ -563,7 +572,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
 | 
				
			|||||||
                try {
 | 
					                try {
 | 
				
			||||||
                    long lastActivityTime = getEntryValue(data, LAST_ACTIVITY_TIME, 0L);
 | 
					                    long lastActivityTime = getEntryValue(data, LAST_ACTIVITY_TIME, 0L);
 | 
				
			||||||
                    long inactivityAlarmTime = getEntryValue(data, INACTIVITY_ALARM_TIME, 0L);
 | 
					                    long inactivityAlarmTime = getEntryValue(data, INACTIVITY_ALARM_TIME, 0L);
 | 
				
			||||||
                    long inactivityTimeout = getEntryValue(data, INACTIVITY_TIMEOUT, TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec));
 | 
					                    long inactivityTimeout = getEntryValue(data, INACTIVITY_TIMEOUT, defaultInactivityTimeoutMs);
 | 
				
			||||||
                    //Actual active state by wall-clock will updated outside this method. This method is only for fetch persistent state
 | 
					                    //Actual active state by wall-clock will updated outside this method. This method is only for fetch persistent state
 | 
				
			||||||
                    final boolean active = getEntryValue(data, ACTIVITY_STATE, false);
 | 
					                    final boolean active = getEntryValue(data, ACTIVITY_STATE, false);
 | 
				
			||||||
                    DeviceState deviceState = DeviceState.builder()
 | 
					                    DeviceState deviceState = DeviceState.builder()
 | 
				
			||||||
@ -634,10 +643,15 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private DeviceStateData toDeviceStateData(EntityData ed, DeviceIdInfo deviceIdInfo) {
 | 
					    DeviceStateData toDeviceStateData(EntityData ed, DeviceIdInfo deviceIdInfo) {
 | 
				
			||||||
        long lastActivityTime = getEntryValue(ed, getKeyType(), LAST_ACTIVITY_TIME, 0L);
 | 
					        long lastActivityTime = getEntryValue(ed, getKeyType(), LAST_ACTIVITY_TIME, 0L);
 | 
				
			||||||
        long inactivityAlarmTime = getEntryValue(ed, getKeyType(), INACTIVITY_ALARM_TIME, 0L);
 | 
					        long inactivityAlarmTime = getEntryValue(ed, getKeyType(), INACTIVITY_ALARM_TIME, 0L);
 | 
				
			||||||
        long inactivityTimeout = getEntryValue(ed, getKeyType(), INACTIVITY_TIMEOUT, TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec));
 | 
					        long inactivityTimeout = getEntryValue(ed, getKeyType(), INACTIVITY_TIMEOUT, defaultInactivityTimeoutMs);
 | 
				
			||||||
 | 
					        if (persistToTelemetry && inactivityTimeout == defaultInactivityTimeoutMs) {
 | 
				
			||||||
 | 
					            log.trace("[{}] default value for inactivity timeout fetched {}, going to fetch inactivity timeout from attributes",
 | 
				
			||||||
 | 
					                    deviceIdInfo.getDeviceId(), inactivityTimeout);
 | 
				
			||||||
 | 
					            inactivityTimeout = getEntryValue(ed, EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT, defaultInactivityTimeoutMs);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
        //Actual active state by wall-clock will updated outside this method. This method is only for fetch persistent state
 | 
					        //Actual active state by wall-clock will updated outside this method. This method is only for fetch persistent state
 | 
				
			||||||
        final boolean active = getEntryValue(ed, getKeyType(), ACTIVITY_STATE, false);
 | 
					        final boolean active = getEntryValue(ed, getKeyType(), ACTIVITY_STATE, false);
 | 
				
			||||||
        DeviceState deviceState = DeviceState.builder()
 | 
					        DeviceState deviceState = DeviceState.builder()
 | 
				
			||||||
 | 
				
			|||||||
@ -15,27 +15,37 @@
 | 
				
			|||||||
 */
 | 
					 */
 | 
				
			||||||
package org.thingsboard.server.service.state;
 | 
					package org.thingsboard.server.service.state;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import org.junit.Assert;
 | 
				
			||||||
import org.junit.Before;
 | 
					import org.junit.Before;
 | 
				
			||||||
import org.junit.Test;
 | 
					import org.junit.Test;
 | 
				
			||||||
import org.junit.runner.RunWith;
 | 
					import org.junit.runner.RunWith;
 | 
				
			||||||
import org.mockito.Mock;
 | 
					import org.mockito.Mock;
 | 
				
			||||||
import org.mockito.Mockito;
 | 
					import org.mockito.Mockito;
 | 
				
			||||||
import org.mockito.junit.MockitoJUnitRunner;
 | 
					import org.mockito.junit.MockitoJUnitRunner;
 | 
				
			||||||
 | 
					import org.thingsboard.server.cluster.TbClusterService;
 | 
				
			||||||
 | 
					import org.thingsboard.server.common.data.DeviceIdInfo;
 | 
				
			||||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
					import org.thingsboard.server.common.data.id.DeviceId;
 | 
				
			||||||
 | 
					import org.thingsboard.server.common.data.id.TenantId;
 | 
				
			||||||
 | 
					import org.thingsboard.server.common.data.query.EntityData;
 | 
				
			||||||
 | 
					import org.thingsboard.server.common.data.query.EntityKeyType;
 | 
				
			||||||
 | 
					import org.thingsboard.server.common.data.query.TsValue;
 | 
				
			||||||
import org.thingsboard.server.dao.attributes.AttributesService;
 | 
					import org.thingsboard.server.dao.attributes.AttributesService;
 | 
				
			||||||
import org.thingsboard.server.dao.device.DeviceService;
 | 
					import org.thingsboard.server.dao.device.DeviceService;
 | 
				
			||||||
import org.thingsboard.server.dao.tenant.TenantService;
 | 
					import org.thingsboard.server.dao.tenant.TenantService;
 | 
				
			||||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
 | 
					import org.thingsboard.server.dao.timeseries.TimeseriesService;
 | 
				
			||||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
					import org.thingsboard.server.queue.discovery.PartitionService;
 | 
				
			||||||
import org.thingsboard.server.cluster.TbClusterService;
 | 
					 | 
				
			||||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
					import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.util.Map;
 | 
				
			||||||
 | 
					import java.util.UUID;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import static org.hamcrest.CoreMatchers.is;
 | 
					import static org.hamcrest.CoreMatchers.is;
 | 
				
			||||||
import static org.hamcrest.MatcherAssert.assertThat;
 | 
					import static org.hamcrest.MatcherAssert.assertThat;
 | 
				
			||||||
import static org.mockito.BDDMockito.willReturn;
 | 
					import static org.mockito.BDDMockito.willReturn;
 | 
				
			||||||
import static org.mockito.Mockito.never;
 | 
					import static org.mockito.Mockito.never;
 | 
				
			||||||
import static org.mockito.Mockito.spy;
 | 
					import static org.mockito.Mockito.spy;
 | 
				
			||||||
import static org.mockito.Mockito.times;
 | 
					import static org.mockito.Mockito.times;
 | 
				
			||||||
 | 
					import static org.thingsboard.server.service.state.DefaultDeviceStateService.INACTIVITY_TIMEOUT;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@RunWith(MockitoJUnitRunner.class)
 | 
					@RunWith(MockitoJUnitRunner.class)
 | 
				
			||||||
public class DefaultDeviceStateServiceTest {
 | 
					public class DefaultDeviceStateServiceTest {
 | 
				
			||||||
@ -83,4 +93,40 @@ public class DefaultDeviceStateServiceTest {
 | 
				
			|||||||
        Mockito.verify(service, times(1)).fetchDeviceStateDataUsingEntityDataQuery(deviceId);
 | 
					        Mockito.verify(service, times(1)).fetchDeviceStateDataUsingEntityDataQuery(deviceId);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Test
 | 
				
			||||||
 | 
					    public void givenPersistToTelemetryAndDefaultInactivityTimeoutFetched_whenTransformingToDeviceStateData_thenTryGetInactivityFromAttribute() {
 | 
				
			||||||
 | 
					        var defaultInactivityTimeoutInSec = 60L;
 | 
				
			||||||
 | 
					        var latest =
 | 
				
			||||||
 | 
					                Map.of(
 | 
				
			||||||
 | 
					                        EntityKeyType.TIME_SERIES, Map.of(INACTIVITY_TIMEOUT, new TsValue(0, Long.toString(defaultInactivityTimeoutInSec * 1000))),
 | 
				
			||||||
 | 
					                        EntityKeyType.SERVER_ATTRIBUTE, Map.of(INACTIVITY_TIMEOUT, new TsValue(0, Long.toString(5000L)))
 | 
				
			||||||
 | 
					                );
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        process(latest, defaultInactivityTimeoutInSec);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Test
 | 
				
			||||||
 | 
					    public void givenPersistToTelemetryAndNoInactivityTimeoutFetchedFromTimeSeries_whenTransformingToDeviceStateData_thenTryGetInactivityFromAttribute() {
 | 
				
			||||||
 | 
					        var defaultInactivityTimeoutInSec = 60L;
 | 
				
			||||||
 | 
					        var latest =
 | 
				
			||||||
 | 
					                Map.of(
 | 
				
			||||||
 | 
					                        EntityKeyType.SERVER_ATTRIBUTE, Map.of(INACTIVITY_TIMEOUT, new TsValue(0, Long.toString(5000L)))
 | 
				
			||||||
 | 
					                );
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        process(latest, defaultInactivityTimeoutInSec);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private void process(Map<EntityKeyType, Map<String, TsValue>> latest, long defaultInactivityTimeoutInSec) {
 | 
				
			||||||
 | 
					        service.setDefaultInactivityTimeoutInSec(defaultInactivityTimeoutInSec);
 | 
				
			||||||
 | 
					        service.setDefaultInactivityTimeoutMs(defaultInactivityTimeoutInSec * 1000);
 | 
				
			||||||
 | 
					        service.setPersistToTelemetry(true);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        var deviceUuid = UUID.randomUUID();
 | 
				
			||||||
 | 
					        var deviceId = new DeviceId(deviceUuid);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        DeviceStateData deviceStateData = service.toDeviceStateData(new EntityData(deviceId, latest, Map.of()), new DeviceIdInfo(TenantId.SYS_TENANT_ID.getId(), UUID.randomUUID(), deviceUuid));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Assert.assertEquals(5000L, deviceStateData.getState().getInactivityTimeout());
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user