Save time series strategies: always handle inactivity timeout as a server attribute
This commit is contained in:
		
							parent
							
								
									877362def0
								
							
						
					
					
						commit
						fe16d10411
					
				@ -152,8 +152,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
 | 
			
		||||
                entityData = new HashMap<>();
 | 
			
		||||
                attributes = JacksonUtil.newObjectNode();
 | 
			
		||||
                for (AttributeKvEntry attr : ssAttributes) {
 | 
			
		||||
                    if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(attr.getKey())
 | 
			
		||||
                            && !DefaultDeviceStateService.INACTIVITY_TIMEOUT.equals(attr.getKey())) {
 | 
			
		||||
                    if (DefaultDeviceStateService.ACTIVITY_KEYS_WITHOUT_INACTIVITY_TIMEOUT.contains(attr.getKey())) {
 | 
			
		||||
                        continue;
 | 
			
		||||
                    }
 | 
			
		||||
                    if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) {
 | 
			
		||||
@ -200,7 +199,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
 | 
			
		||||
            }
 | 
			
		||||
            Map<Long, Map<String, Object>> tsData = new HashMap<>();
 | 
			
		||||
            for (TsKvEntry tsKvEntry : tsKvEntries) {
 | 
			
		||||
                if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(tsKvEntry.getKey())) {
 | 
			
		||||
                if (DefaultDeviceStateService.ACTIVITY_KEYS_WITH_INACTIVITY_TIMEOUT.contains(tsKvEntry.getKey())) {
 | 
			
		||||
                    continue;
 | 
			
		||||
                }
 | 
			
		||||
                tsData.computeIfAbsent(tsKvEntry.getTs(), k -> new HashMap<>()).put(tsKvEntry.getKey(), tsKvEntry.getValue());
 | 
			
		||||
 | 
			
		||||
@ -96,6 +96,7 @@ import java.util.HashSet;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Objects;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.Random;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
@ -129,11 +130,10 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
 | 
			
		||||
    private static final List<EntityKey> PERSISTENT_TELEMETRY_KEYS = Arrays.asList(
 | 
			
		||||
            new EntityKey(EntityKeyType.TIME_SERIES, LAST_ACTIVITY_TIME),
 | 
			
		||||
            new EntityKey(EntityKeyType.TIME_SERIES, INACTIVITY_ALARM_TIME),
 | 
			
		||||
            new EntityKey(EntityKeyType.TIME_SERIES, INACTIVITY_TIMEOUT),
 | 
			
		||||
            new EntityKey(EntityKeyType.TIME_SERIES, ACTIVITY_STATE),
 | 
			
		||||
            new EntityKey(EntityKeyType.TIME_SERIES, LAST_CONNECT_TIME),
 | 
			
		||||
            new EntityKey(EntityKeyType.TIME_SERIES, LAST_DISCONNECT_TIME),
 | 
			
		||||
            new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT));
 | 
			
		||||
            new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT)); // inactivity timeout is always a server attribute, even when activity data is stored as time series
 | 
			
		||||
 | 
			
		||||
    private static final List<EntityKey> PERSISTENT_ATTRIBUTE_KEYS = Arrays.asList(
 | 
			
		||||
            new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, LAST_ACTIVITY_TIME),
 | 
			
		||||
@ -143,8 +143,14 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
 | 
			
		||||
            new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, LAST_CONNECT_TIME),
 | 
			
		||||
            new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, LAST_DISCONNECT_TIME));
 | 
			
		||||
 | 
			
		||||
    public static final List<String> PERSISTENT_ATTRIBUTES = Arrays.asList(ACTIVITY_STATE, LAST_CONNECT_TIME,
 | 
			
		||||
            LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT);
 | 
			
		||||
    public static final List<String> ACTIVITY_KEYS_WITHOUT_INACTIVITY_TIMEOUT = List.of(
 | 
			
		||||
            ACTIVITY_STATE, LAST_CONNECT_TIME, LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    public static final List<String> ACTIVITY_KEYS_WITH_INACTIVITY_TIMEOUT = List.of(
 | 
			
		||||
            ACTIVITY_STATE, LAST_CONNECT_TIME, LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    private static final List<EntityKey> PERSISTENT_ENTITY_FIELDS = Arrays.asList(
 | 
			
		||||
            new EntityKey(EntityKeyType.ENTITY_FIELD, "name"),
 | 
			
		||||
            new EntityKey(EntityKeyType.ENTITY_FIELD, "type"),
 | 
			
		||||
@ -643,41 +649,45 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
 | 
			
		||||
        deviceStates.remove(deviceId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) {
 | 
			
		||||
        ListenableFuture<DeviceStateData> future;
 | 
			
		||||
        if (persistToTelemetry) {
 | 
			
		||||
            ListenableFuture<List<TsKvEntry>> tsData = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), PERSISTENT_ATTRIBUTES);
 | 
			
		||||
            future = Futures.transform(tsData, extractDeviceStateData(device), MoreExecutors.directExecutor());
 | 
			
		||||
            ListenableFuture<List<TsKvEntry>> timeseriesActivityDataFuture = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), ACTIVITY_KEYS_WITHOUT_INACTIVITY_TIMEOUT);
 | 
			
		||||
            ListenableFuture<Optional<AttributeKvEntry>> inactivityTimeoutAttributeFuture = attributesService.find(
 | 
			
		||||
                    TenantId.SYS_TENANT_ID, device.getId(), AttributeScope.SERVER_SCOPE, INACTIVITY_TIMEOUT
 | 
			
		||||
            );
 | 
			
		||||
 | 
			
		||||
            ListenableFuture<List<? extends KvEntry>> fullActivityDataFuture = Futures.whenAllSucceed(timeseriesActivityDataFuture, inactivityTimeoutAttributeFuture).call(() -> {
 | 
			
		||||
                List<TsKvEntry> activityTimeseries = Futures.getDone(timeseriesActivityDataFuture);
 | 
			
		||||
                Optional<AttributeKvEntry> inactivityTimeoutAttribute = Futures.getDone(inactivityTimeoutAttributeFuture);
 | 
			
		||||
 | 
			
		||||
                List<KvEntry> result;
 | 
			
		||||
                if (inactivityTimeoutAttribute.isPresent()) {
 | 
			
		||||
                    result = new ArrayList<>(activityTimeseries.size() + 1);
 | 
			
		||||
                    result.addAll(activityTimeseries);
 | 
			
		||||
                    inactivityTimeoutAttribute.ifPresent(result::add);
 | 
			
		||||
                } else {
 | 
			
		||||
                    return activityTimeseries;
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                return result;
 | 
			
		||||
            }, deviceStateCallbackExecutor);
 | 
			
		||||
 | 
			
		||||
            future = Futures.transform(fullActivityDataFuture, extractDeviceStateData(device), MoreExecutors.directExecutor());
 | 
			
		||||
        } else {
 | 
			
		||||
            ListenableFuture<List<AttributeKvEntry>> attrData = attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), AttributeScope.SERVER_SCOPE, PERSISTENT_ATTRIBUTES);
 | 
			
		||||
            future = Futures.transform(attrData, extractDeviceStateData(device), MoreExecutors.directExecutor());
 | 
			
		||||
            ListenableFuture<List<AttributeKvEntry>> attributesActivityDataFuture = attributesService.find(
 | 
			
		||||
                    TenantId.SYS_TENANT_ID, device.getId(), AttributeScope.SERVER_SCOPE, ACTIVITY_KEYS_WITH_INACTIVITY_TIMEOUT
 | 
			
		||||
            );
 | 
			
		||||
            future = Futures.transform(attributesActivityDataFuture, extractDeviceStateData(device), MoreExecutors.directExecutor());
 | 
			
		||||
        }
 | 
			
		||||
        return transformInactivityTimeout(future);
 | 
			
		||||
        return future;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<DeviceStateData> transformInactivityTimeout(ListenableFuture<DeviceStateData> future) {
 | 
			
		||||
        return Futures.transformAsync(future, deviceStateData -> {
 | 
			
		||||
            if (!persistToTelemetry || deviceStateData.getState().getInactivityTimeout() != defaultInactivityTimeoutMs) {
 | 
			
		||||
                return future; //fail fast
 | 
			
		||||
            }
 | 
			
		||||
            var attributesFuture = attributesService.find(TenantId.SYS_TENANT_ID, deviceStateData.getDeviceId(), AttributeScope.SERVER_SCOPE, INACTIVITY_TIMEOUT);
 | 
			
		||||
            return Futures.transform(attributesFuture, attributes -> {
 | 
			
		||||
                attributes.flatMap(KvEntry::getLongValue).ifPresent((inactivityTimeout) -> {
 | 
			
		||||
                    if (inactivityTimeout > 0) {
 | 
			
		||||
                        deviceStateData.getState().setInactivityTimeout(inactivityTimeout);
 | 
			
		||||
                    }
 | 
			
		||||
                });
 | 
			
		||||
                return deviceStateData;
 | 
			
		||||
            }, MoreExecutors.directExecutor());
 | 
			
		||||
        }, deviceStateCallbackExecutor);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private <T extends KvEntry> Function<List<T>, DeviceStateData> extractDeviceStateData(Device device) {
 | 
			
		||||
    private Function<List<? extends KvEntry>, DeviceStateData> extractDeviceStateData(Device device) {
 | 
			
		||||
        return new Function<>() {
 | 
			
		||||
            @Nonnull
 | 
			
		||||
            @Override
 | 
			
		||||
            public DeviceStateData apply(@Nullable List<T> data) {
 | 
			
		||||
            public DeviceStateData apply(@Nullable List<? extends KvEntry> data) {
 | 
			
		||||
                try {
 | 
			
		||||
                    long lastActivityTime = getEntryValue(data, LAST_ACTIVITY_TIME, 0L);
 | 
			
		||||
                    long inactivityAlarmTime = getEntryValue(data, INACTIVITY_ALARM_TIME, 0L);
 | 
			
		||||
@ -690,7 +700,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
 | 
			
		||||
                            .lastDisconnectTime(getEntryValue(data, LAST_DISCONNECT_TIME, 0L))
 | 
			
		||||
                            .lastActivityTime(lastActivityTime)
 | 
			
		||||
                            .lastInactivityAlarmTime(inactivityAlarmTime)
 | 
			
		||||
                            .inactivityTimeout(inactivityTimeout)
 | 
			
		||||
                            .inactivityTimeout(inactivityTimeout > 0 ? inactivityTimeout : defaultInactivityTimeoutMs)
 | 
			
		||||
                            .build();
 | 
			
		||||
                    TbMsgMetaData md = new TbMsgMetaData();
 | 
			
		||||
                    md.putValue("deviceName", device.getName());
 | 
			
		||||
@ -761,12 +771,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
 | 
			
		||||
    DeviceStateData toDeviceStateData(EntityData ed, DeviceIdInfo deviceIdInfo) {
 | 
			
		||||
        long lastActivityTime = getEntryValue(ed, getKeyType(), LAST_ACTIVITY_TIME, 0L);
 | 
			
		||||
        long inactivityAlarmTime = getEntryValue(ed, getKeyType(), INACTIVITY_ALARM_TIME, 0L);
 | 
			
		||||
        long inactivityTimeout = getEntryValue(ed, getKeyType(), INACTIVITY_TIMEOUT, defaultInactivityTimeoutMs);
 | 
			
		||||
        if (persistToTelemetry && inactivityTimeout == defaultInactivityTimeoutMs) {
 | 
			
		||||
            log.trace("[{}] default value for inactivity timeout fetched {}, going to fetch inactivity timeout from attributes",
 | 
			
		||||
                    deviceIdInfo.getDeviceId(), inactivityTimeout);
 | 
			
		||||
            inactivityTimeout = getEntryValue(ed, EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT, defaultInactivityTimeoutMs);
 | 
			
		||||
        }
 | 
			
		||||
        long inactivityTimeout = getEntryValue(ed, EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT, defaultInactivityTimeoutMs);
 | 
			
		||||
        // Actual active state by wall-clock will be updated outside this method. This method is only for fetching persistent state
 | 
			
		||||
        final boolean active = getEntryValue(ed, getKeyType(), ACTIVITY_STATE, false);
 | 
			
		||||
        DeviceState deviceState = DeviceState.builder()
 | 
			
		||||
 | 
			
		||||
@ -168,9 +168,6 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
 | 
			
		||||
    public void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys, TbCallback callback) {
 | 
			
		||||
        onTimeSeriesUpdate(entityId,
 | 
			
		||||
                keys.stream().map(key -> new BasicTsKvEntry(0, new StringDataEntry(key, ""))).collect(Collectors.toList()));
 | 
			
		||||
        if (entityId.getEntityType() == EntityType.DEVICE) {
 | 
			
		||||
            deleteDeviceInactivityTimeout(tenantId, entityId, keys);
 | 
			
		||||
        }
 | 
			
		||||
        callback.onSuccess();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -31,7 +31,6 @@ import org.thingsboard.common.util.DonAsynchron;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.DeviceStateManager;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
 | 
			
		||||
@ -39,11 +38,9 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityView;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CustomerId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.KvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
 | 
			
		||||
@ -55,7 +52,6 @@ import org.thingsboard.server.dao.util.KvUtils;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
 | 
			
		||||
import org.thingsboard.server.service.cf.CalculatedFieldQueueService;
 | 
			
		||||
import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
 | 
			
		||||
import org.thingsboard.server.service.state.DefaultDeviceStateService;
 | 
			
		||||
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
@ -82,7 +78,6 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
 | 
			
		||||
    private final TbApiUsageReportClient apiUsageClient;
 | 
			
		||||
    private final TbApiUsageStateService apiUsageStateService;
 | 
			
		||||
    private final CalculatedFieldQueueService calculatedFieldQueueService;
 | 
			
		||||
    private final DeviceStateManager deviceStateManager;
 | 
			
		||||
 | 
			
		||||
    private ExecutorService tsCallBackExecutor;
 | 
			
		||||
 | 
			
		||||
@ -94,15 +89,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
 | 
			
		||||
                                               @Lazy TbEntityViewService tbEntityViewService,
 | 
			
		||||
                                               TbApiUsageReportClient apiUsageClient,
 | 
			
		||||
                                               TbApiUsageStateService apiUsageStateService,
 | 
			
		||||
                                               CalculatedFieldQueueService calculatedFieldQueueService,
 | 
			
		||||
                                               DeviceStateManager deviceStateManager) {
 | 
			
		||||
                                               CalculatedFieldQueueService calculatedFieldQueueService) {
 | 
			
		||||
        this.attrService = attrService;
 | 
			
		||||
        this.tsService = tsService;
 | 
			
		||||
        this.tbEntityViewService = tbEntityViewService;
 | 
			
		||||
        this.apiUsageClient = apiUsageClient;
 | 
			
		||||
        this.apiUsageStateService = apiUsageStateService;
 | 
			
		||||
        this.calculatedFieldQueueService = calculatedFieldQueueService;
 | 
			
		||||
        this.deviceStateManager = deviceStateManager;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
@ -165,14 +158,6 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
 | 
			
		||||
            }
 | 
			
		||||
        }, t -> request.getCallback().onFailure(t));
 | 
			
		||||
 | 
			
		||||
        if (entityId.getEntityType() == EntityType.DEVICE && request.getStrategy().saveLatest() /* Device State Service reads from the latest values when initializing */) {
 | 
			
		||||
            findNewInactivityTimeout(request.getEntries()).ifPresent(newInactivityTimeout ->
 | 
			
		||||
                    addMainCallback(resultFuture, __ -> deviceStateManager.onDeviceInactivityTimeoutUpdate(
 | 
			
		||||
                            tenantId, new DeviceId(entityId.getId()), newInactivityTimeout, TbCallback.EMPTY)
 | 
			
		||||
                    )
 | 
			
		||||
            );
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (strategy.sendWsUpdate()) {
 | 
			
		||||
            addWsCallback(resultFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
 | 
			
		||||
        }
 | 
			
		||||
@ -182,21 +167,6 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
 | 
			
		||||
        return resultFuture;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static Optional<Long> findNewInactivityTimeout(List<TsKvEntry> entries) {
 | 
			
		||||
        return entries.stream()
 | 
			
		||||
                .filter(entry -> Objects.equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT, entry.getKey()))
 | 
			
		||||
                .findFirst()
 | 
			
		||||
                .map(DefaultTelemetrySubscriptionService::parseAsLong);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static long parseAsLong(KvEntry kve) {
 | 
			
		||||
        try {
 | 
			
		||||
            return Long.parseLong(kve.getValueAsString());
 | 
			
		||||
        } catch (NumberFormatException e) {
 | 
			
		||||
            return 0L;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void saveAttributes(AttributesSaveRequest request) {
 | 
			
		||||
        checkInternalEntity(request.getEntityId());
 | 
			
		||||
 | 
			
		||||
@ -38,9 +38,6 @@ import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.msg.TbMsgType;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivityTrigger;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityData;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityKeyType;
 | 
			
		||||
import org.thingsboard.server.common.data.query.TsValue;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
 | 
			
		||||
@ -88,7 +85,6 @@ import static org.mockito.Mockito.verify;
 | 
			
		||||
import static org.mockito.Mockito.when;
 | 
			
		||||
import static org.thingsboard.server.service.state.DefaultDeviceStateService.ACTIVITY_STATE;
 | 
			
		||||
import static org.thingsboard.server.service.state.DefaultDeviceStateService.INACTIVITY_ALARM_TIME;
 | 
			
		||||
import static org.thingsboard.server.service.state.DefaultDeviceStateService.INACTIVITY_TIMEOUT;
 | 
			
		||||
import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_ACTIVITY_TIME;
 | 
			
		||||
import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_CONNECT_TIME;
 | 
			
		||||
import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_DISCONNECT_TIME;
 | 
			
		||||
@ -508,42 +504,6 @@ public class DefaultDeviceStateServiceTest {
 | 
			
		||||
        verify(service).fetchDeviceStateDataUsingSeparateRequests(deviceId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenPersistToTelemetryAndDefaultInactivityTimeoutFetched_whenTransformingToDeviceStateData_thenTryGetInactivityFromAttribute() {
 | 
			
		||||
        var defaultInactivityTimeoutInSec = 60L;
 | 
			
		||||
        var latest =
 | 
			
		||||
                Map.of(
 | 
			
		||||
                        EntityKeyType.TIME_SERIES, Map.of(INACTIVITY_TIMEOUT, new TsValue(0, Long.toString(defaultInactivityTimeoutInSec * 1000))),
 | 
			
		||||
                        EntityKeyType.SERVER_ATTRIBUTE, Map.of(INACTIVITY_TIMEOUT, new TsValue(0, Long.toString(5000L)))
 | 
			
		||||
                );
 | 
			
		||||
 | 
			
		||||
        process(latest, defaultInactivityTimeoutInSec);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenPersistToTelemetryAndNoInactivityTimeoutFetchedFromTimeSeries_whenTransformingToDeviceStateData_thenTryGetInactivityFromAttribute() {
 | 
			
		||||
        var defaultInactivityTimeoutInSec = 60L;
 | 
			
		||||
        var latest =
 | 
			
		||||
                Map.of(
 | 
			
		||||
                        EntityKeyType.SERVER_ATTRIBUTE, Map.of(INACTIVITY_TIMEOUT, new TsValue(0, Long.toString(5000L)))
 | 
			
		||||
                );
 | 
			
		||||
 | 
			
		||||
        process(latest, defaultInactivityTimeoutInSec);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void process(Map<EntityKeyType, Map<String, TsValue>> latest, long defaultInactivityTimeoutInSec) {
 | 
			
		||||
        service.setDefaultInactivityTimeoutInSec(defaultInactivityTimeoutInSec);
 | 
			
		||||
        service.setDefaultInactivityTimeoutMs(defaultInactivityTimeoutInSec * 1000);
 | 
			
		||||
        service.setPersistToTelemetry(true);
 | 
			
		||||
 | 
			
		||||
        var deviceUuid = UUID.randomUUID();
 | 
			
		||||
        var deviceId = new DeviceId(deviceUuid);
 | 
			
		||||
 | 
			
		||||
        DeviceStateData deviceStateData = service.toDeviceStateData(new EntityData(deviceId, latest, Map.of()), new DeviceIdInfo(TenantId.SYS_TENANT_ID.getId(), UUID.randomUUID(), deviceUuid));
 | 
			
		||||
 | 
			
		||||
        assertThat(deviceStateData.getState().getInactivityTimeout()).isEqualTo(5000L);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void initStateService(long timeout) throws InterruptedException {
 | 
			
		||||
        service.stop();
 | 
			
		||||
        reset(service, telemetrySubscriptionService);
 | 
			
		||||
 | 
			
		||||
@ -24,30 +24,25 @@ import org.junit.jupiter.api.Test;
 | 
			
		||||
import org.junit.jupiter.api.extension.ExtendWith;
 | 
			
		||||
import org.junit.jupiter.params.ParameterizedTest;
 | 
			
		||||
import org.junit.jupiter.params.provider.Arguments;
 | 
			
		||||
import org.junit.jupiter.params.provider.EnumSource;
 | 
			
		||||
import org.junit.jupiter.params.provider.MethodSource;
 | 
			
		||||
import org.mockito.Mock;
 | 
			
		||||
import org.mockito.junit.jupiter.MockitoExtension;
 | 
			
		||||
import org.springframework.test.util.ReflectionTestUtils;
 | 
			
		||||
import org.thingsboard.rule.engine.api.DeviceStateManager;
 | 
			
		||||
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
 | 
			
		||||
import org.thingsboard.server.cluster.TbClusterService;
 | 
			
		||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
 | 
			
		||||
import org.thingsboard.server.common.data.ApiUsageState;
 | 
			
		||||
import org.thingsboard.server.common.data.ApiUsageStateValue;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityView;
 | 
			
		||||
import org.thingsboard.server.common.data.id.ApiUsageStateId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CustomerId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityViewId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.KvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.objects.AttributesEntityView;
 | 
			
		||||
@ -78,17 +73,14 @@ import java.util.concurrent.ExecutorService;
 | 
			
		||||
import java.util.stream.LongStream;
 | 
			
		||||
import java.util.stream.Stream;
 | 
			
		||||
 | 
			
		||||
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
 | 
			
		||||
import static com.google.common.util.concurrent.Futures.immediateFuture;
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.anyLong;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.eq;
 | 
			
		||||
import static org.mockito.BDDMockito.given;
 | 
			
		||||
import static org.mockito.BDDMockito.then;
 | 
			
		||||
import static org.mockito.Mockito.lenient;
 | 
			
		||||
import static org.mockito.Mockito.never;
 | 
			
		||||
 | 
			
		||||
@ExtendWith(MockitoExtension.class)
 | 
			
		||||
class DefaultTelemetrySubscriptionServiceTest {
 | 
			
		||||
@ -132,14 +124,12 @@ class DefaultTelemetrySubscriptionServiceTest {
 | 
			
		||||
    TbApiUsageStateService apiUsageStateService;
 | 
			
		||||
    @Mock
 | 
			
		||||
    CalculatedFieldQueueService calculatedFieldQueueService;
 | 
			
		||||
    @Mock
 | 
			
		||||
    DeviceStateManager deviceStateManager;
 | 
			
		||||
 | 
			
		||||
    DefaultTelemetrySubscriptionService telemetryService;
 | 
			
		||||
 | 
			
		||||
    @BeforeEach
 | 
			
		||||
    void setup() {
 | 
			
		||||
        telemetryService = new DefaultTelemetrySubscriptionService(attrService, tsService, tbEntityViewService, apiUsageClient, apiUsageStateService, calculatedFieldQueueService, deviceStateManager);
 | 
			
		||||
        telemetryService = new DefaultTelemetrySubscriptionService(attrService, tsService, tbEntityViewService, apiUsageClient, apiUsageStateService, calculatedFieldQueueService);
 | 
			
		||||
        ReflectionTestUtils.setField(telemetryService, "clusterService", clusterService);
 | 
			
		||||
        ReflectionTestUtils.setField(telemetryService, "partitionService", partitionService);
 | 
			
		||||
        ReflectionTestUtils.setField(telemetryService, "subscriptionManagerService", Optional.of(subscriptionManagerService));
 | 
			
		||||
@ -180,6 +170,28 @@ class DefaultTelemetrySubscriptionServiceTest {
 | 
			
		||||
        tsCallBackExecutor.shutdownNow();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /* --- Save time series API --- */
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void shouldThrowErrorWhenTryingToSaveTimeseriesForApiUsageState() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        var request = TimeseriesSaveRequest.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .customerId(customerId)
 | 
			
		||||
                .entityId(new ApiUsageStateId(UUID.randomUUID()))
 | 
			
		||||
                .entries(sampleTelemetry)
 | 
			
		||||
                .strategy(TimeseriesSaveRequest.Strategy.PROCESS_ALL)
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        assertThatThrownBy(() -> telemetryService.saveTimeseries(request))
 | 
			
		||||
                .isInstanceOf(RuntimeException.class)
 | 
			
		||||
                .hasMessage("Can't update API Usage State!");
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        then(tsService).shouldHaveNoInteractions();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void shouldReportStorageDataPointsApiUsageWhenTimeSeriesIsSaved() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
@ -389,148 +401,6 @@ class DefaultTelemetrySubscriptionServiceTest {
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void shouldThrowErrorWhenTryingToSaveTimeseriesForApiUsageState() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        var request = TimeseriesSaveRequest.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .customerId(customerId)
 | 
			
		||||
                .entityId(new ApiUsageStateId(UUID.randomUUID()))
 | 
			
		||||
                .entries(sampleTelemetry)
 | 
			
		||||
                .strategy(TimeseriesSaveRequest.Strategy.PROCESS_ALL)
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        assertThatThrownBy(() -> telemetryService.saveTimeseries(request))
 | 
			
		||||
                .isInstanceOf(RuntimeException.class)
 | 
			
		||||
                .hasMessage("Can't update API Usage State!");
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        then(tsService).shouldHaveNoInteractions();
 | 
			
		||||
        then(deviceStateManager).shouldHaveNoInteractions();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void shouldNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesWasSavedToLatest() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088");
 | 
			
		||||
        var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L));
 | 
			
		||||
 | 
			
		||||
        var request = TimeseriesSaveRequest.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .customerId(customerId)
 | 
			
		||||
                .entityId(deviceId)
 | 
			
		||||
                .entry(inactivityTimeout)
 | 
			
		||||
                .strategy(new TimeseriesSaveRequest.Strategy(false, true, false, false))
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        given(tsService.saveLatest(tenantId, deviceId, List.of(inactivityTimeout))).willReturn(immediateFuture(TimeseriesSaveResult.of(1, listOfNNumbers(1))));
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        telemetryService.saveTimeseries(request);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        then(deviceStateManager).should().onDeviceInactivityTimeoutUpdate(tenantId, deviceId, 5000L, TbCallback.EMPTY);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @ParameterizedTest
 | 
			
		||||
    @EnumSource(
 | 
			
		||||
            value = EntityType.class,
 | 
			
		||||
            names = {"DEVICE", "API_USAGE_STATE"}, // API usage state excluded due to coverage in another test
 | 
			
		||||
            mode = EnumSource.Mode.EXCLUDE
 | 
			
		||||
    )
 | 
			
		||||
    void shouldNotNotifyDeviceStateManagerWhenInactivityTimeoutTimeseriesWasUpdatedButEntityTypeIsNotDevice(EntityType entityType) {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        var nonDeviceId = EntityIdFactory.getByTypeAndUuid(entityType, "cc51e450-53e1-11ee-883e-e56b48fd2088");
 | 
			
		||||
        var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L));
 | 
			
		||||
 | 
			
		||||
        var request = TimeseriesSaveRequest.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .customerId(customerId)
 | 
			
		||||
                .entityId(nonDeviceId)
 | 
			
		||||
                .entry(inactivityTimeout)
 | 
			
		||||
                .strategy(new TimeseriesSaveRequest.Strategy(false, true, false, false))
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        given(tsService.saveLatest(tenantId, nonDeviceId, List.of(inactivityTimeout))).willReturn(immediateFuture(TimeseriesSaveResult.of(1, listOfNNumbers(1))));
 | 
			
		||||
        lenient().when(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, nonDeviceId)).thenReturn(immediateFuture(Collections.emptyList()));
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        telemetryService.saveTimeseries(request);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void shouldNotNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesWasNotSavedToLatest() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088");
 | 
			
		||||
        var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L));
 | 
			
		||||
 | 
			
		||||
        var request = TimeseriesSaveRequest.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .customerId(customerId)
 | 
			
		||||
                .entityId(deviceId)
 | 
			
		||||
                .entry(inactivityTimeout)
 | 
			
		||||
                .strategy(new TimeseriesSaveRequest.Strategy(true, false, true, true))
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        given(tsService.saveWithoutLatest(tenantId, deviceId, List.of(inactivityTimeout), 0L)).willReturn(immediateFuture(TimeseriesSaveResult.of(1, null)));
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        telemetryService.saveTimeseries(request);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void shouldNotNotifyDeviceStateManagerWhenInactivityTimeoutTimeseriesWasNotUpdated() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088");
 | 
			
		||||
        var notInactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("notInactivityTimeout", 5000L));
 | 
			
		||||
 | 
			
		||||
        var request = TimeseriesSaveRequest.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .customerId(customerId)
 | 
			
		||||
                .entityId(deviceId)
 | 
			
		||||
                .entry(notInactivityTimeout)
 | 
			
		||||
                .strategy(new TimeseriesSaveRequest.Strategy(false, true, false, false))
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        given(tsService.saveLatest(tenantId, deviceId, List.of(notInactivityTimeout))).willReturn(immediateFuture(TimeseriesSaveResult.of(1, listOfNNumbers(1))));
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        telemetryService.saveTimeseries(request);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void shouldNotNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesSaveFailed() {
 | 
			
		||||
        // GIVEN
 | 
			
		||||
        var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088");
 | 
			
		||||
        var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L));
 | 
			
		||||
 | 
			
		||||
        var request = TimeseriesSaveRequest.builder()
 | 
			
		||||
                .tenantId(tenantId)
 | 
			
		||||
                .customerId(customerId)
 | 
			
		||||
                .entityId(deviceId)
 | 
			
		||||
                .entry(inactivityTimeout)
 | 
			
		||||
                .strategy(new TimeseriesSaveRequest.Strategy(false, true, false, false))
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        given(tsService.saveLatest(tenantId, deviceId, List.of(inactivityTimeout))).willReturn(immediateFailedFuture(new RuntimeException("failed to save")));
 | 
			
		||||
 | 
			
		||||
        // WHEN
 | 
			
		||||
        telemetryService.saveTimeseries(request);
 | 
			
		||||
 | 
			
		||||
        // THEN
 | 
			
		||||
        then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // used to emulate sequence numbers returned by save latest API
 | 
			
		||||
    private static List<Long> listOfNNumbers(int N) {
 | 
			
		||||
        return LongStream.range(0, N).boxed().toList();
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user