Merge pull request #13541 from dskarzh/activity-fix

Update cached device activity status only after a successful database save
This commit is contained in:
Viacheslav Klimov 2025-06-11 11:44:14 +03:00 committed by GitHub
commit 00e8c13336
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 445 additions and 445 deletions

View File

@ -21,6 +21,7 @@ import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.RuleEngineCalculatedFieldQueueService;
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import java.util.List;
@ -35,7 +36,7 @@ public interface CalculatedFieldQueueService extends RuleEngineCalculatedFieldQu
*/
void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback<Void> callback);
void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback);
void pushRequestToQueue(AttributesSaveRequest request, AttributesSaveResult result, FutureCallback<Void> callback);
void pushRequestToQueue(AttributesDeleteRequest request, List<String> result, FutureCallback<Void> callback);

View File

@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.msg.TbMsgType;
@ -96,7 +97,7 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
}
@Override
public void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback) {
public void pushRequestToQueue(AttributesSaveRequest request, AttributesSaveResult result, FutureCallback<Void> callback) {
var tenantId = request.getTenantId();
var entityId = request.getEntityId();
checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()),
@ -186,17 +187,18 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
return msg.build();
}
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List<Long> versions) {
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, AttributesSaveResult result) {
ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder();
CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType());
telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name()));
List<AttributeKvEntry> entries = request.getEntries();
List<Long> versions = result.versions();
for (int i = 0; i < entries.size(); i++) {
AttributeValueProto.Builder attrProtoBuilder = ProtoUtils.toProto(entries.get(i)).toBuilder();
if (versions != null) {
attrProtoBuilder.setVersion(versions.get(i));
}
attrProtoBuilder.setVersion(versions.get(i));
telemetryMsg.addAttrData(attrProtoBuilder.build());
}
msg.setTelemetryMsg(telemetryMsg.build());

View File

@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.msg.TbMsgType;
@ -62,8 +63,6 @@ import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
@ -240,10 +239,11 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService {
return deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials);
}
private ListenableFuture<List<Long>> saveProvisionStateAttribute(Device device) {
return attributesService.save(device.getTenantId(), device.getId(), AttributeScope.SERVER_SCOPE,
Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry(DEVICE_PROVISION_STATE, PROVISIONED_STATE),
System.currentTimeMillis())));
private ListenableFuture<AttributesSaveResult> saveProvisionStateAttribute(Device device) {
return attributesService.save(
device.getTenantId(), device.getId(), AttributeScope.SERVER_SCOPE,
new BaseAttributeKvEntry(new StringDataEntry(DEVICE_PROVISION_STATE, PROVISIONED_STATE), System.currentTimeMillis())
);
}
private DeviceCredentials getDeviceCredentials(Device device) {

View File

@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
@ -581,10 +582,10 @@ public abstract class EdgeGrpcSession implements Closeable {
@Override
public void onSuccess(@Nullable Pair<Long, Long> newStartTsAndSeqId) {
if (newStartTsAndSeqId != null) {
ListenableFuture<List<Long>> updateFuture = updateQueueStartTsAndSeqId(newStartTsAndSeqId);
ListenableFuture<AttributesSaveResult> updateFuture = updateQueueStartTsAndSeqId(newStartTsAndSeqId);
Futures.addCallback(updateFuture, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<Long> list) {
public void onSuccess(@Nullable AttributesSaveResult saveResult) {
log.debug("[{}][{}] queue offset was updated [{}]", tenantId, edge.getId(), newStartTsAndSeqId);
boolean newEventsAvailable;
if (fetcher.isSeqIdNewCycleStarted()) {
@ -645,8 +646,7 @@ public abstract class EdgeGrpcSession implements Closeable {
log.trace("[{}][{}] entity message processed [{}]", tenantId, edge.getId(), downlinkMsg);
}
}
case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED ->
downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent);
case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED -> downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent);
default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, edge.getId(), edgeEvent.getAction());
}
} catch (Exception e) {
@ -722,13 +722,14 @@ public abstract class EdgeGrpcSession implements Closeable {
return startSeqId;
}
private ListenableFuture<List<Long>> updateQueueStartTsAndSeqId(Pair<Long, Long> pair) {
private ListenableFuture<AttributesSaveResult> updateQueueStartTsAndSeqId(Pair<Long, Long> pair) {
newStartTs = pair.getFirst();
newStartSeqId = pair.getSecond();
log.trace("[{}] updateQueueStartTsAndSeqId [{}][{}][{}]", sessionId, edge.getId(), newStartTs, newStartSeqId);
List<AttributeKvEntry> attributes = Arrays.asList(
List<AttributeKvEntry> attributes = List.of(
new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, newStartTs), System.currentTimeMillis()),
new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_SEQ_ID_ATTR_KEY, newStartSeqId), System.currentTimeMillis()));
new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_SEQ_ID_ATTR_KEY, newStartSeqId), System.currentTimeMillis())
);
return ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), AttributeScope.SERVER_SCOPE, attributes);
}

View File

@ -64,6 +64,7 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
@ -581,9 +582,9 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))), 0L);
addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value));
} else {
ListenableFuture<List<Long>> saveFuture = attributesService.save(TenantId.SYS_TENANT_ID, deviceId, AttributeScope.SERVER_SCOPE,
Collections.singletonList(new BaseAttributeKvEntry(new BooleanDataEntry(key, value)
, System.currentTimeMillis())));
ListenableFuture<AttributesSaveResult> saveFuture = attributesService.save(
TenantId.SYS_TENANT_ID, deviceId, AttributeScope.SERVER_SCOPE, new BaseAttributeKvEntry(new BooleanDataEntry(key, value), System.currentTimeMillis())
);
addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value));
}
}
@ -611,7 +612,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
}
private <S> void addTsCallback(ListenableFuture<S> saveFuture, final FutureCallback<S> callback) {
Futures.addCallback(saveFuture, new FutureCallback<S>() {
Futures.addCallback(saveFuture, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable S result) {
callback.onSuccess(result);

View File

@ -27,11 +27,10 @@ import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
@ -170,35 +169,22 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
@Lazy
private TelemetrySubscriptionService tsSubService;
@Value("${state.defaultInactivityTimeoutInSec}")
@Getter
@Setter
private long defaultInactivityTimeoutInSec;
@Value("#{${state.defaultInactivityTimeoutInSec} * 1000}")
@Getter
@Setter
private long defaultInactivityTimeoutMs;
@Value("${state.defaultStateCheckIntervalInSec}")
@Getter
private int defaultStateCheckIntervalInSec;
@Value("${usage.stats.devices.report_interval:60}")
@Getter
private int defaultActivityStatsIntervalInSec;
@Value("${state.persistToTelemetry:false}")
@Getter
@Setter
private boolean persistToTelemetry;
@Value("${state.initFetchPackSize:50000}")
@Getter
private int initFetchPackSize;
@Value("${state.telemetryTtl:0}")
@Getter
private int telemetryTtl;
private ListeningExecutorService deviceStateExecutor;
@ -281,12 +267,11 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
DeviceState state = stateData.getState();
state.setLastActivityTime(lastReportedActivity);
if (!state.isActive()) {
state.setActive(true);
if (lastReportedActivity <= state.getLastInactivityAlarmTime()) {
state.setLastInactivityAlarmTime(0);
save(stateData.getTenantId(), deviceId, INACTIVITY_ALARM_TIME, 0);
}
onDeviceActivityStatusChange(deviceId, true, stateData);
onDeviceActivityStatusChange(true, stateData);
}
} else {
log.debug("updateActivityState - fetched state IS NULL for device {}, lastReportedActivity {}", deviceId, lastReportedActivity);
@ -355,7 +340,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
return;
}
log.trace("[{}][{}] On device inactivity: processing inactivity event with ts [{}].", tenantId.getId(), deviceId.getId(), lastInactivityTime);
reportInactivity(lastInactivityTime, deviceId, stateData);
reportInactivity(lastInactivityTime, stateData);
}
@Override
@ -387,7 +372,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
@Override
public void onFailure(Throwable t) {
public void onFailure(@NonNull Throwable t) {
log.warn("Failed to register device to the state service", t);
callback.onFailure(t);
}
@ -539,7 +524,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
}
void reportActivityStats() {
private void reportActivityStats() {
try {
Map<TenantId, Pair<AtomicInteger, AtomicInteger>> stats = new HashMap<>();
for (DeviceStateData stateData : deviceStates.values()) {
@ -574,7 +559,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
&& (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() <= state.getLastActivityTime())
&& stateData.getDeviceCreationTime() + state.getInactivityTimeout() <= ts) {
if (partitionService.resolve(ServiceType.TB_CORE, stateData.getTenantId(), deviceId).isMyPartition()) {
reportInactivity(ts, deviceId, stateData);
reportInactivity(ts, stateData);
} else {
cleanupEntity(deviceId);
}
@ -585,15 +570,25 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
}
private void reportInactivity(long ts, DeviceId deviceId, DeviceStateData stateData) {
DeviceState state = stateData.getState();
state.setActive(false);
state.setLastInactivityAlarmTime(ts);
save(stateData.getTenantId(), deviceId, INACTIVITY_ALARM_TIME, ts);
onDeviceActivityStatusChange(deviceId, false, stateData);
private void reportInactivity(long ts, DeviceStateData stateData) {
var tenantId = stateData.getTenantId();
var deviceId = stateData.getDeviceId();
Futures.addCallback(save(stateData.getTenantId(), deviceId, INACTIVITY_ALARM_TIME, ts), new FutureCallback<>() {
@Override
public void onSuccess(Void success) {
stateData.getState().setLastInactivityAlarmTime(ts);
onDeviceActivityStatusChange(false, stateData);
}
@Override
public void onFailure(@NonNull Throwable t) {
log.error("[{}][{}] Failed to update device last inactivity alarm time to '{}'. Device state data: {}", tenantId, deviceId, ts, stateData, t);
}
}, deviceStateCallbackExecutor);
}
boolean isActive(long ts, DeviceState state) {
private static boolean isActive(long ts, DeviceState state) {
return ts < state.getLastActivityTime() + state.getInactivityTimeout();
}
@ -616,17 +611,32 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
}
private void onDeviceActivityStatusChange(DeviceId deviceId, boolean active, DeviceStateData stateData) {
save(stateData.getTenantId(), deviceId, ACTIVITY_STATE, active);
pushRuleEngineMessage(stateData, active ? TbMsgType.ACTIVITY_EVENT : TbMsgType.INACTIVITY_EVENT);
TbMsgMetaData metaData = stateData.getMetaData();
notificationRuleProcessor.process(DeviceActivityTrigger.builder()
.tenantId(stateData.getTenantId()).customerId(stateData.getCustomerId())
.deviceId(deviceId).active(active)
.deviceName(metaData.getValue("deviceName"))
.deviceType(metaData.getValue("deviceType"))
.deviceLabel(metaData.getValue("deviceLabel"))
.build());
private void onDeviceActivityStatusChange(boolean active, DeviceStateData stateData) {
var tenantId = stateData.getTenantId();
var deviceId = stateData.getDeviceId();
Futures.addCallback(save(tenantId, deviceId, ACTIVITY_STATE, active), new FutureCallback<>() {
@Override
public void onSuccess(Void success) {
stateData.getState().setActive(active);
pushRuleEngineMessage(stateData, active ? TbMsgType.ACTIVITY_EVENT : TbMsgType.INACTIVITY_EVENT);
TbMsgMetaData metaData = stateData.getMetaData();
notificationRuleProcessor.process(DeviceActivityTrigger.builder()
.tenantId(tenantId)
.customerId(stateData.getCustomerId())
.deviceId(deviceId)
.active(active)
.deviceName(metaData.getValue("deviceName"))
.deviceType(metaData.getValue("deviceType"))
.deviceLabel(metaData.getValue("deviceLabel"))
.build());
}
@Override
public void onFailure(@NonNull Throwable t) {
log.error("[{}][{}] Failed to change device activity status to '{}'. Device state data: {}", tenantId, deviceId, active, stateData, t);
}
}, deviceStateCallbackExecutor);
}
boolean cleanDeviceStateIfBelongsToExternalPartition(TenantId tenantId, final DeviceId deviceId) {
@ -634,8 +644,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
boolean cleanup = !partitionedEntities.containsKey(tpi);
if (cleanup) {
cleanupEntity(deviceId);
log.debug("[{}][{}] device belongs to external partition. Probably rebalancing is in progress. Topic: {}"
, tenantId, deviceId, tpi.getFullTopicName());
log.debug("[{}][{}] device belongs to external partition. Probably rebalancing is in progress. Topic: {}", tenantId, deviceId, tpi.getFullTopicName());
}
return cleanup;
}
@ -766,7 +775,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
DeviceStateData toDeviceStateData(EntityData ed, DeviceIdInfo deviceIdInfo) {
private DeviceStateData toDeviceStateData(EntityData ed, DeviceIdInfo deviceIdInfo) {
long lastActivityTime = getEntryValue(ed, getKeyType(), LAST_ACTIVITY_TIME, 0L);
long inactivityAlarmTime = getEntryValue(ed, getKeyType(), INACTIVITY_ALARM_TIME, 0L);
long inactivityTimeout = getEntryValue(ed, EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT, defaultInactivityTimeoutMs);
@ -849,6 +858,9 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
private void pushRuleEngineMessage(DeviceStateData stateData, TbMsgType msgType) {
var tenantId = stateData.getTenantId();
var deviceId = stateData.getDeviceId();
DeviceState state = stateData.getState();
try {
String data;
@ -865,7 +877,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
TbMsg tbMsg = TbMsg.newMsg()
.type(msgType)
.originator(stateData.getDeviceId())
.originator(deviceId)
.customerId(stateData.getCustomerId())
.copyMetaData(md)
.dataType(TbMsgDataType.JSON)
@ -873,21 +885,22 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
.build();
clusterService.pushMsgToRuleEngine(stateData.getTenantId(), stateData.getDeviceId(), tbMsg, null);
} catch (Exception e) {
log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e);
log.warn("[{}][{}] Failed to push '{}' message to the rule engine due to {}. Device state: {}", tenantId, deviceId, msgType, e.getMessage(), state);
}
}
private void save(TenantId tenantId, DeviceId deviceId, String key, long value) {
save(tenantId, deviceId, new LongDataEntry(key, value), getCurrentTimeMillis());
private ListenableFuture<Void> save(TenantId tenantId, DeviceId deviceId, String key, long value) {
return save(tenantId, deviceId, new LongDataEntry(key, value), getCurrentTimeMillis());
}
private void save(TenantId tenantId, DeviceId deviceId, String key, boolean value) {
save(tenantId, deviceId, new BooleanDataEntry(key, value), getCurrentTimeMillis());
private ListenableFuture<Void> save(TenantId tenantId, DeviceId deviceId, String key, boolean value) {
return save(tenantId, deviceId, new BooleanDataEntry(key, value), getCurrentTimeMillis());
}
private void save(TenantId tenantId, DeviceId deviceId, KvEntry kvEntry, long ts) {
private ListenableFuture<Void> save(TenantId tenantId, DeviceId deviceId, KvEntry kvEntry, long ts) {
ListenableFuture<?> future;
if (persistToTelemetry) {
tsSubService.saveTimeseriesInternal(TimeseriesSaveRequest.builder()
future = tsSubService.saveTimeseriesInternal(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(deviceId)
.entry(new BasicTsKvEntry(ts, kvEntry))
@ -895,7 +908,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
.callback(new TelemetrySaveCallback<>(deviceId, kvEntry))
.build());
} else {
tsSubService.saveAttributes(AttributesSaveRequest.builder()
future = tsSubService.saveAttributesInternal(AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(deviceId)
.scope(AttributeScope.SERVER_SCOPE)
@ -903,20 +916,14 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
.callback(new TelemetrySaveCallback<>(deviceId, kvEntry))
.build());
}
return Futures.transform(future, __ -> null, MoreExecutors.directExecutor());
}
long getCurrentTimeMillis() {
return System.currentTimeMillis();
}
private static class TelemetrySaveCallback<T> implements FutureCallback<T> {
private final DeviceId deviceId;
private final KvEntry kvEntry;
TelemetrySaveCallback(DeviceId deviceId, KvEntry kvEntry) {
this.deviceId = deviceId;
this.kvEntry = kvEntry;
}
private record TelemetrySaveCallback<T>(DeviceId deviceId, KvEntry kvEntry) implements FutureCallback<T> {
@Override
public void onSuccess(@Nullable T result) {
@ -924,9 +931,10 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
@Override
public void onFailure(Throwable t) {
public void onFailure(@NonNull Throwable t) {
log.warn("[{}] Failed to update entry {}", deviceId, kvEntry, t);
}
}
}

View File

@ -45,6 +45,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
@ -62,7 +63,6 @@ import org.thingsboard.server.service.state.DefaultDeviceStateService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -190,17 +190,16 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}
@Override
public void saveAttributesInternal(AttributesSaveRequest request) {
log.trace("Executing saveInternal [{}]", request);
public ListenableFuture<AttributesSaveResult> saveAttributesInternal(AttributesSaveRequest request) {
TenantId tenantId = request.getTenantId();
EntityId entityId = request.getEntityId();
AttributesSaveRequest.Strategy strategy = request.getStrategy();
ListenableFuture<List<Long>> resultFuture;
ListenableFuture<AttributesSaveResult> resultFuture;
if (strategy.saveAttributes()) {
resultFuture = attrService.save(tenantId, entityId, request.getScope(), request.getEntries());
} else {
resultFuture = Futures.immediateFuture(Collections.emptyList());
resultFuture = Futures.immediateFuture(AttributesSaveResult.EMPTY);
}
addMainCallback(resultFuture, result -> {
@ -228,6 +227,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
if (strategy.sendWsUpdate()) {
addWsCallback(resultFuture, success -> onAttributesUpdate(tenantId, entityId, request.getScope().name(), request.getEntries()));
}
return resultFuture;
}
private static boolean shouldSendSharedAttributesUpdatedNotification(AttributesSaveRequest request) {

View File

@ -21,6 +21,7 @@ import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
/**
@ -30,7 +31,7 @@ public interface InternalTelemetryService extends RuleEngineTelemetryService {
ListenableFuture<TimeseriesSaveResult> saveTimeseriesInternal(TimeseriesSaveRequest request);
void saveAttributesInternal(AttributesSaveRequest request);
ListenableFuture<AttributesSaveResult> saveAttributesInternal(AttributesSaveRequest request);
void deleteTimeseriesInternal(TimeseriesDeleteRequest request);

View File

@ -898,6 +898,8 @@ state:
# Used only when state.persistToTelemetry is set to 'true' and Cassandra is used for timeseries data.
# 0 means time-to-live mechanism is disabled.
telemetryTtl: "${STATE_TELEMETRY_TTL:0}"
# Number of device records to fetch per batch when initializing device activity states
initFetchPackSize: "${TB_DEVICE_STATE_INIT_FETCH_PACK_SIZE:50000}"
# Configuration properties for rule nodes related to device activity state
rule:
node:

View File

@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.IdBased;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
@ -395,7 +396,7 @@ public class EntityServiceTest extends AbstractControllerTest {
List<Long> highTemperatures = new ArrayList<>();
createTestHierarchy(tenantId, assets, devices, new ArrayList<>(), new ArrayList<>(), temperatures, highTemperatures);
List<ListenableFuture<List<Long>>> attributeFutures = new ArrayList<>();
List<ListenableFuture<AttributesSaveResult>> attributeFutures = new ArrayList<>();
for (int i = 0; i < devices.size(); i++) {
Device device = devices.get(i);
attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), AttributeScope.CLIENT_SCOPE));
@ -545,7 +546,7 @@ public class EntityServiceTest extends AbstractControllerTest {
List<Long> highTemperatures = new ArrayList<>();
createTestHierarchy(tenantId, assets, devices, new ArrayList<>(), new ArrayList<>(), temperatures, highTemperatures);
List<ListenableFuture<List<Long>>> attributeFutures = new ArrayList<>();
List<ListenableFuture<AttributesSaveResult>> attributeFutures = new ArrayList<>();
for (int i = 0; i < devices.size(); i++) {
Device device = devices.get(i);
attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), AttributeScope.CLIENT_SCOPE));
@ -599,7 +600,7 @@ public class EntityServiceTest extends AbstractControllerTest {
List<Long> highConsumptions = new ArrayList<>();
createTestHierarchy(tenantId, assets, devices, consumptions, highConsumptions, new ArrayList<>(), new ArrayList<>());
List<ListenableFuture<List<Long>>> attributeFutures = new ArrayList<>();
List<ListenableFuture<AttributesSaveResult>> attributeFutures = new ArrayList<>();
for (int i = 0; i < assets.size(); i++) {
Asset asset = assets.get(i);
attributeFutures.add(saveLongAttribute(asset.getId(), "consumption", consumptions.get(i), AttributeScope.SERVER_SCOPE));
@ -1506,7 +1507,7 @@ public class EntityServiceTest extends AbstractControllerTest {
}
}
List<ListenableFuture<List<Long>>> attributeFutures = new ArrayList<>();
List<ListenableFuture<AttributesSaveResult>> attributeFutures = new ArrayList<>();
for (int i = 0; i < devices.size(); i++) {
Device device = devices.get(i);
for (AttributeScope currentScope : AttributeScope.values()) {
@ -1578,7 +1579,7 @@ public class EntityServiceTest extends AbstractControllerTest {
}
}
List<ListenableFuture<List<Long>>> attributeFutures = new ArrayList<>();
List<ListenableFuture<AttributesSaveResult>> attributeFutures = new ArrayList<>();
for (int i = 0; i < devices.size(); i++) {
Device device = devices.get(i);
attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), AttributeScope.CLIENT_SCOPE));
@ -1808,7 +1809,7 @@ public class EntityServiceTest extends AbstractControllerTest {
}
}
List<ListenableFuture<List<Long>>> attributeFutures = new ArrayList<>();
List<ListenableFuture<AttributesSaveResult>> attributeFutures = new ArrayList<>();
for (int i = 0; i < devices.size(); i++) {
Device device = devices.get(i);
attributeFutures.add(saveStringAttribute(device.getId(), "attributeString", attributeStrings.get(i), AttributeScope.CLIENT_SCOPE));
@ -2269,16 +2270,16 @@ public class EntityServiceTest extends AbstractControllerTest {
return filter;
}
private ListenableFuture<List<Long>> saveLongAttribute(EntityId entityId, String key, long value, AttributeScope scope) {
private ListenableFuture<AttributesSaveResult> saveLongAttribute(EntityId entityId, String key, long value, AttributeScope scope) {
KvEntry attrValue = new LongDataEntry(key, value);
AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L);
return attributesService.save(tenantId, entityId, scope, Collections.singletonList(attr));
return attributesService.save(tenantId, entityId, scope, List.of(attr));
}
private ListenableFuture<List<Long>> saveStringAttribute(EntityId entityId, String key, String value, AttributeScope scope) {
private ListenableFuture<AttributesSaveResult> saveStringAttribute(EntityId entityId, String key, String value, AttributeScope scope) {
KvEntry attrValue = new StringDataEntry(key, value);
AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L);
return attributesService.save(tenantId, entityId, scope, Collections.singletonList(attr));
return attributesService.save(tenantId, entityId, scope, List.of(attr));
}
private ListenableFuture<TimeseriesSaveResult> saveTimeseries(EntityId entityId, String key, Double value) {
@ -2294,8 +2295,8 @@ public class EntityServiceTest extends AbstractControllerTest {
}
protected void createMultiRootHierarchy(List<Asset> buildings, List<Asset> apartments,
Map<String, Map<UUID, String>> entityNameByTypeMap,
Map<UUID, UUID> childParentRelationMap) throws InterruptedException {
Map<String, Map<UUID, String>> entityNameByTypeMap,
Map<UUID, UUID> childParentRelationMap) throws InterruptedException {
for (int k = 0; k < 3; k++) {
Asset building = new Asset();
building.setTenantId(tenantId);

View File

@ -16,6 +16,9 @@
package org.thingsboard.server.service.state;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -31,13 +34,13 @@ import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceIdInfo;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivityTrigger;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
@ -50,20 +53,22 @@ import org.thingsboard.server.dao.sql.query.EntityQueryRepository;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.usagestats.DefaultTbApiUsageReportClient;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
@ -77,8 +82,8 @@ import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -90,7 +95,10 @@ import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAS
import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_DISCONNECT_TIME;
@ExtendWith(MockitoExtension.class)
public class DefaultDeviceStateServiceTest {
class DefaultDeviceStateServiceTest {
ListeningExecutorService deviceStateExecutor;
ListeningExecutorService deviceStateCallbackExecutor;
@Mock
DeviceService deviceService;
@ -113,25 +121,48 @@ public class DefaultDeviceStateServiceTest {
@Mock
DefaultTbApiUsageReportClient defaultTbApiUsageReportClient;
TenantId tenantId = new TenantId(UUID.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112"));
DeviceId deviceId = DeviceId.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112");
TopicPartitionInfo tpi;
long defaultInactivityTimeoutMs = Duration.ofMinutes(10L).toMillis();
TenantId tenantId = TenantId.fromUUID(UUID.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112"));
DeviceId deviceId = DeviceId.fromString("c209f718-42e5-11f0-9fe2-0242ac120002");
TopicPartitionInfo tpi = TopicPartitionInfo.builder()
.topic("tb_core")
.partition(0)
.myPartition(true)
.build();
DefaultDeviceStateService service;
@BeforeEach
public void setUp() {
void setUp() {
service = spy(new DefaultDeviceStateService(deviceService, attributesService, tsService, clusterService, partitionService, entityQueryRepository, null, defaultTbApiUsageReportClient, notificationRuleProcessor));
ReflectionTestUtils.setField(service, "tsSubService", telemetrySubscriptionService);
ReflectionTestUtils.setField(service, "defaultInactivityTimeoutMs", defaultInactivityTimeoutMs);
ReflectionTestUtils.setField(service, "defaultStateCheckIntervalInSec", 60);
ReflectionTestUtils.setField(service, "defaultActivityStatsIntervalInSec", 60);
ReflectionTestUtils.setField(service, "initFetchPackSize", 10);
ReflectionTestUtils.setField(service, "initFetchPackSize", 50000);
tpi = TopicPartitionInfo.builder().myPartition(true).build();
deviceStateExecutor = MoreExecutors.newDirectExecutorService();
ReflectionTestUtils.setField(service, "deviceStateExecutor", deviceStateExecutor);
deviceStateCallbackExecutor = MoreExecutors.newDirectExecutorService();
ReflectionTestUtils.setField(service, "deviceStateCallbackExecutor", deviceStateCallbackExecutor);
lenient().when(partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(tpi);
ConcurrentMap<TopicPartitionInfo, Set<DeviceId>> partitionedEntities = new ConcurrentHashMap<>();
partitionedEntities.put(tpi, new HashSet<>());
ReflectionTestUtils.setField(service, "partitionedEntities", partitionedEntities);
}
@AfterEach
void cleanup() {
deviceStateExecutor.shutdownNow();
deviceStateCallbackExecutor.shutdownNow();
}
@Test
public void givenDeviceBelongsToExternalPartition_whenOnDeviceConnect_thenCleansStateAndDoesNotReportConnect() {
void givenDeviceBelongsToExternalPartition_whenOnDeviceConnect_thenCleansStateAndDoesNotReportConnect() {
// GIVEN
doReturn(true).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
@ -149,7 +180,7 @@ public class DefaultDeviceStateServiceTest {
@ParameterizedTest
@ValueSource(longs = {Long.MIN_VALUE, -100, -1})
public void givenNegativeLastConnectTime_whenOnDeviceConnect_thenSkipsThisEvent(long negativeLastConnectTime) {
void givenNegativeLastConnectTime_whenOnDeviceConnect_thenSkipsThisEvent(long negativeLastConnectTime) {
// GIVEN
doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
@ -166,7 +197,7 @@ public class DefaultDeviceStateServiceTest {
@ParameterizedTest
@MethodSource("provideOutdatedTimestamps")
public void givenOutdatedLastConnectTime_whenOnDeviceDisconnect_thenSkipsThisEvent(long outdatedLastConnectTime, long currentLastConnectTime) {
void givenOutdatedLastConnectTime_whenOnDeviceDisconnect_thenSkipsThisEvent(long outdatedLastConnectTime, long currentLastConnectTime) {
// GIVEN
doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
@ -188,7 +219,7 @@ public class DefaultDeviceStateServiceTest {
}
@Test
public void givenDeviceBelongsToMyPartition_whenOnDeviceConnect_thenReportsConnect() {
void givenDeviceBelongsToMyPartition_whenOnDeviceConnect_thenReportsConnect() {
// GIVEN
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
@ -202,11 +233,13 @@ public class DefaultDeviceStateServiceTest {
service.deviceStates.put(deviceId, deviceStateData);
long lastConnectTime = System.currentTimeMillis();
mockSuccessfulSaveAttributes();
// WHEN
service.onDeviceConnect(tenantId, deviceId, lastConnectTime);
// THEN
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request ->
request.getTenantId().equals(tenantId) && request.getEntityId().equals(deviceId) &&
request.getScope().equals(AttributeScope.SERVER_SCOPE) &&
request.getEntries().get(0).getKey().equals(LAST_CONNECT_TIME) &&
@ -221,7 +254,7 @@ public class DefaultDeviceStateServiceTest {
}
@Test
public void givenDeviceBelongsToExternalPartition_whenOnDeviceDisconnect_thenCleansStateAndDoesNotReportDisconnect() {
void givenDeviceBelongsToExternalPartition_whenOnDeviceDisconnect_thenCleansStateAndDoesNotReportDisconnect() {
// GIVEN
doReturn(true).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
@ -238,7 +271,7 @@ public class DefaultDeviceStateServiceTest {
@ParameterizedTest
@ValueSource(longs = {Long.MIN_VALUE, -100, -1})
public void givenNegativeLastDisconnectTime_whenOnDeviceDisconnect_thenSkipsThisEvent(long negativeLastDisconnectTime) {
void givenNegativeLastDisconnectTime_whenOnDeviceDisconnect_thenSkipsThisEvent(long negativeLastDisconnectTime) {
// GIVEN
doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
@ -254,7 +287,7 @@ public class DefaultDeviceStateServiceTest {
@ParameterizedTest
@MethodSource("provideOutdatedTimestamps")
public void givenOutdatedLastDisconnectTime_whenOnDeviceDisconnect_thenSkipsThisEvent(long outdatedLastDisconnectTime, long currentLastDisconnectTime) {
void givenOutdatedLastDisconnectTime_whenOnDeviceDisconnect_thenSkipsThisEvent(long outdatedLastDisconnectTime, long currentLastDisconnectTime) {
// GIVEN
doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
@ -275,7 +308,7 @@ public class DefaultDeviceStateServiceTest {
}
@Test
public void givenDeviceBelongsToMyPartition_whenOnDeviceDisconnect_thenReportsDisconnect() {
void givenDeviceBelongsToMyPartition_whenOnDeviceDisconnect_thenReportsDisconnect() {
// GIVEN
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
@ -289,11 +322,13 @@ public class DefaultDeviceStateServiceTest {
service.deviceStates.put(deviceId, deviceStateData);
long lastDisconnectTime = System.currentTimeMillis();
mockSuccessfulSaveAttributes();
// WHEN
service.onDeviceDisconnect(tenantId, deviceId, lastDisconnectTime);
// THEN
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request ->
request.getTenantId().equals(tenantId) && request.getEntityId().equals(deviceId) &&
request.getScope().equals(AttributeScope.SERVER_SCOPE) &&
request.getEntries().get(0).getKey().equals(LAST_DISCONNECT_TIME) &&
@ -308,7 +343,7 @@ public class DefaultDeviceStateServiceTest {
}
@Test
public void givenDeviceBelongsToExternalPartition_whenOnDeviceInactivity_thenCleansStateAndDoesNotReportInactivity() {
void givenDeviceBelongsToExternalPartition_whenOnDeviceInactivity_thenCleansStateAndDoesNotReportInactivity() {
// GIVEN
doReturn(true).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
@ -325,7 +360,7 @@ public class DefaultDeviceStateServiceTest {
@ParameterizedTest
@ValueSource(longs = {Long.MIN_VALUE, -100, -1})
public void givenNegativeLastInactivityTime_whenOnDeviceInactivity_thenSkipsThisEvent(long negativeLastInactivityTime) {
void givenNegativeLastInactivityTime_whenOnDeviceInactivity_thenSkipsThisEvent(long negativeLastInactivityTime) {
// GIVEN
doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
@ -341,7 +376,7 @@ public class DefaultDeviceStateServiceTest {
@ParameterizedTest
@MethodSource("provideOutdatedTimestamps")
public void givenReceivedInactivityTimeIsLessThanOrEqualToCurrentInactivityTime_whenOnDeviceInactivity_thenSkipsThisEvent(
void givenReceivedInactivityTimeIsLessThanOrEqualToCurrentInactivityTime_whenOnDeviceInactivity_thenSkipsThisEvent(
long outdatedLastInactivityTime, long currentLastInactivityTime
) {
// GIVEN
@ -365,7 +400,7 @@ public class DefaultDeviceStateServiceTest {
@ParameterizedTest
@MethodSource("provideOutdatedTimestamps")
public void givenReceivedInactivityTimeIsLessThanOrEqualToCurrentActivityTime_whenOnDeviceInactivity_thenSkipsThisEvent(
void givenReceivedInactivityTimeIsLessThanOrEqualToCurrentActivityTime_whenOnDeviceInactivity_thenSkipsThisEvent(
long outdatedLastInactivityTime, long currentLastActivityTime
) {
// GIVEN
@ -398,7 +433,7 @@ public class DefaultDeviceStateServiceTest {
}
@Test
public void givenDeviceBelongsToMyPartition_whenOnDeviceInactivity_thenReportsInactivity() {
void givenDeviceBelongsToMyPartition_whenOnDeviceInactivity_thenReportsInactivity() {
// GIVEN
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
@ -412,17 +447,19 @@ public class DefaultDeviceStateServiceTest {
service.deviceStates.put(deviceId, deviceStateData);
long lastInactivityTime = System.currentTimeMillis();
mockSuccessfulSaveAttributes();
// WHEN
service.onDeviceInactivity(tenantId, deviceId, lastInactivityTime);
// THEN
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request ->
request.getTenantId().equals(tenantId) && request.getEntityId().equals(deviceId) &&
request.getScope().equals(AttributeScope.SERVER_SCOPE) &&
request.getEntries().get(0).getKey().equals(INACTIVITY_ALARM_TIME) &&
request.getEntries().get(0).getValue().equals(lastInactivityTime)
));
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request ->
request.getTenantId().equals(tenantId) && request.getEntityId().equals(deviceId) &&
request.getScope().equals(AttributeScope.SERVER_SCOPE) &&
request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) &&
@ -445,7 +482,7 @@ public class DefaultDeviceStateServiceTest {
}
@Test
public void givenInactivityTimeoutReached_whenUpdateInactivityStateIfExpired_thenReportsInactivity() {
void givenInactivityTimeoutReached_whenUpdateInactivityStateIfExpired_thenReportsInactivity() {
// GIVEN
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
@ -456,16 +493,18 @@ public class DefaultDeviceStateServiceTest {
given(partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId)).willReturn(tpi);
mockSuccessfulSaveAttributes();
// WHEN
service.updateInactivityStateIfExpired(System.currentTimeMillis(), deviceId, deviceStateData);
// THEN
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request ->
request.getTenantId().equals(tenantId) && request.getEntityId().equals(deviceId) &&
request.getScope().equals(AttributeScope.SERVER_SCOPE) &&
request.getEntries().get(0).getKey().equals(INACTIVITY_ALARM_TIME)
));
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request ->
request.getTenantId().equals(tenantId) && request.getEntityId().equals(deviceId) &&
request.getScope().equals(AttributeScope.SERVER_SCOPE) &&
request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) &&
@ -488,7 +527,7 @@ public class DefaultDeviceStateServiceTest {
}
@Test
public void givenDeviceIdFromDeviceStatesMap_whenGetOrFetchDeviceStateData_thenNoStackOverflow() {
void givenDeviceIdFromDeviceStatesMap_whenGetOrFetchDeviceStateData_thenNoStackOverflow() {
service.deviceStates.put(deviceId, deviceStateDataMock);
DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId);
assertThat(deviceStateData).isEqualTo(deviceStateDataMock);
@ -496,7 +535,7 @@ public class DefaultDeviceStateServiceTest {
}
@Test
public void givenDeviceIdWithoutDeviceStateInMap_whenGetOrFetchDeviceStateData_thenFetchDeviceStateData() {
void givenDeviceIdWithoutDeviceStateInMap_whenGetOrFetchDeviceStateData_thenFetchDeviceStateData() {
service.deviceStates.clear();
willReturn(deviceStateDataMock).given(service).fetchDeviceStateDataUsingSeparateRequests(deviceId);
DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId);
@ -504,29 +543,18 @@ public class DefaultDeviceStateServiceTest {
verify(service).fetchDeviceStateDataUsingSeparateRequests(deviceId);
}
private void initStateService(long timeout) throws InterruptedException {
service.stop();
reset(service, telemetrySubscriptionService);
service.setDefaultInactivityTimeoutMs(timeout);
service.init();
when(partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(tpi);
when(entityQueryRepository.findEntityDataByQueryInternal(any())).thenReturn(new PageData<>());
var deviceIdInfo = new DeviceIdInfo(tenantId.getId(), null, deviceId.getId());
when(deviceService.findDeviceIdInfos(any()))
.thenReturn(new PageData<>(List.of(deviceIdInfo), 0, 1, false));
PartitionChangeEvent event = new PartitionChangeEvent(this, ServiceType.TB_CORE, Map.of(
new QueueKey(ServiceType.TB_CORE), Collections.singleton(tpi)
), Collections.emptyMap());
service.onApplicationEvent(event);
Thread.sleep(100);
}
@MethodSource
@ParameterizedTest
void testOnDeviceInactivityTimeoutUpdate(boolean initialActivityStatus, long newInactivityTimeout, boolean expectedActivityStatus) {
// GIVEN
doReturn(200L).when(service).getCurrentTimeMillis();
@Test
public void increaseInactivityForInactiveDeviceTest() throws Exception {
final long defaultTimeout = 1;
initStateService(defaultTimeout);
DeviceState deviceState = DeviceState.builder().build();
DeviceStateData deviceStateData = DeviceStateData.builder()
var deviceState = DeviceState.builder()
.active(initialActivityStatus)
.lastActivityTime(100L)
.build();
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(deviceState)
@ -536,174 +564,44 @@ public class DefaultDeviceStateServiceTest {
service.deviceStates.put(deviceId, deviceStateData);
service.getPartitionedEntities(tpi).add(deviceId);
service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
activityVerify(true);
Thread.sleep(defaultTimeout);
service.checkStates();
activityVerify(false);
mockSuccessfulSaveAttributes();
reset(telemetrySubscriptionService);
// WHEN
service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newInactivityTimeout);
long increase = 100;
long newTimeout = System.currentTimeMillis() - deviceState.getLastActivityTime() + increase;
// THEN
long expectedInactivityTimeout = newInactivityTimeout != 0 ? newInactivityTimeout : defaultInactivityTimeoutMs;
assertThat(deviceState.getInactivityTimeout()).isEqualTo(expectedInactivityTimeout);
service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout);
activityVerify(true);
Thread.sleep(increase);
service.checkStates();
activityVerify(false);
assertThat(deviceState.isActive()).isEqualTo(expectedActivityStatus);
if (initialActivityStatus != expectedActivityStatus) {
then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request -> {
AttributeKvEntry entry = request.getEntries().get(0);
return request.getEntityId().equals(deviceId) && entry.getKey().equals(ACTIVITY_STATE) && entry.getValue().equals(expectedActivityStatus);
}));
}
}
reset(telemetrySubscriptionService);
service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
activityVerify(true);
Thread.sleep(newTimeout + 5);
service.checkStates();
activityVerify(false);
// to simplify test, these arguments assume that the current time is 200 and the last activity time is 100
private static Stream<Arguments> testOnDeviceInactivityTimeoutUpdate() {
return Stream.of(
Arguments.of(true, 1L, false),
Arguments.of(true, 50L, false),
Arguments.of(true, 99L, false),
Arguments.of(true, 100L, false),
Arguments.of(true, 101L, true),
Arguments.of(true, 0L, true), // should use default inactivity timeout of 10 minutes
Arguments.of(false, 1L, false),
Arguments.of(false, 50L, false),
Arguments.of(false, 99L, false),
Arguments.of(false, 100L, false),
Arguments.of(false, 101L, true),
Arguments.of(false, 0L, true) // should use default inactivity timeout of 10 minutes
);
}
@Test
public void increaseInactivityForActiveDeviceTest() throws Exception {
final long defaultTimeout = 1000;
initStateService(defaultTimeout);
DeviceState deviceState = DeviceState.builder().build();
DeviceStateData deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(deviceState)
.metaData(new TbMsgMetaData())
.build();
service.deviceStates.put(deviceId, deviceStateData);
service.getPartitionedEntities(tpi).add(deviceId);
service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
activityVerify(true);
reset(telemetrySubscriptionService);
long increase = 100;
long newTimeout = System.currentTimeMillis() - deviceState.getLastActivityTime() + increase;
service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout);
verify(telemetrySubscriptionService, never()).saveAttributes(argThat(request ->
request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE)
));
Thread.sleep(defaultTimeout + increase);
service.checkStates();
activityVerify(false);
reset(telemetrySubscriptionService);
service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
activityVerify(true);
Thread.sleep(newTimeout);
service.checkStates();
activityVerify(false);
}
@Test
public void increaseSmallInactivityForInactiveDeviceTest() throws Exception {
final long defaultTimeout = 1;
initStateService(defaultTimeout);
DeviceState deviceState = DeviceState.builder().build();
DeviceStateData deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(deviceState)
.metaData(new TbMsgMetaData())
.build();
service.deviceStates.put(deviceId, deviceStateData);
service.getPartitionedEntities(tpi).add(deviceId);
service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
activityVerify(true);
Thread.sleep(defaultTimeout);
service.checkStates();
activityVerify(false);
reset(telemetrySubscriptionService);
long newTimeout = 1;
Thread.sleep(newTimeout);
verify(telemetrySubscriptionService, never()).saveAttributes(argThat(request ->
request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE)
));
}
@Test
public void decreaseInactivityForActiveDeviceTest() throws Exception {
final long defaultTimeout = 1000;
initStateService(defaultTimeout);
DeviceState deviceState = DeviceState.builder().build();
DeviceStateData deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(deviceState)
.metaData(new TbMsgMetaData())
.build();
service.deviceStates.put(deviceId, deviceStateData);
service.getPartitionedEntities(tpi).add(deviceId);
service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
activityVerify(true);
long newTimeout = 1;
Thread.sleep(newTimeout);
service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout);
activityVerify(false);
reset(telemetrySubscriptionService);
service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, defaultTimeout);
activityVerify(true);
Thread.sleep(defaultTimeout);
service.checkStates();
activityVerify(false);
}
@Test
public void decreaseInactivityForInactiveDeviceTest() throws Exception {
final long defaultTimeout = 1000;
initStateService(defaultTimeout);
DeviceState deviceState = DeviceState.builder().build();
DeviceStateData deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(deviceState)
.metaData(new TbMsgMetaData())
.build();
service.deviceStates.put(deviceId, deviceStateData);
service.getPartitionedEntities(tpi).add(deviceId);
service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
activityVerify(true);
Thread.sleep(defaultTimeout);
service.checkStates();
activityVerify(false);
reset(telemetrySubscriptionService);
long newTimeout = 1;
service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout);
verify(telemetrySubscriptionService, never()).saveAttributes(argThat(request ->
request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE)
));
}
private void activityVerify(boolean isActive) {
verify(telemetrySubscriptionService).saveAttributes(argThat(request ->
request.getEntityId().equals(deviceId) &&
request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) &&
request.getEntries().get(0).getValue().equals(isActive)
));
}
@Test
public void givenStateDataIsNull_whenUpdateActivityState_thenShouldCleanupDevice() {
void givenStateDataIsNull_whenUpdateActivityState_thenShouldCleanupDevice() {
// GIVEN
service.deviceStates.put(deviceId, deviceStateDataMock);
@ -719,7 +617,7 @@ public class DefaultDeviceStateServiceTest {
@ParameterizedTest
@MethodSource("provideParametersForUpdateActivityState")
public void givenTestParameters_whenUpdateActivityState_thenShouldBeInTheExpectedStateAndPerformExpectedActions(
void givenTestParameters_whenUpdateActivityState_thenShouldBeInTheExpectedStateAndPerformExpectedActions(
boolean activityState, long previousActivityTime, long lastReportedActivity, long inactivityAlarmTime,
long expectedInactivityAlarmTime, boolean shouldSetInactivityAlarmTimeToZero,
boolean shouldUpdateActivityStateToActive
@ -739,13 +637,15 @@ public class DefaultDeviceStateServiceTest {
.metaData(new TbMsgMetaData())
.build();
mockSuccessfulSaveAttributes();
// WHEN
service.updateActivityState(deviceId, deviceStateData, lastReportedActivity);
// THEN
assertThat(deviceState.isActive()).isEqualTo(true);
assertThat(deviceState.getLastActivityTime()).isEqualTo(lastReportedActivity);
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request ->
request.getEntityId().equals(deviceId) &&
request.getEntries().get(0).getKey().equals(LAST_ACTIVITY_TIME) &&
request.getEntries().get(0).getValue().equals(lastReportedActivity)
@ -753,7 +653,7 @@ public class DefaultDeviceStateServiceTest {
assertThat(deviceState.getLastInactivityAlarmTime()).isEqualTo(expectedInactivityAlarmTime);
if (shouldSetInactivityAlarmTimeToZero) {
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request ->
request.getEntityId().equals(deviceId) &&
request.getEntries().get(0).getKey().equals(INACTIVITY_ALARM_TIME) &&
request.getEntries().get(0).getValue().equals(0L)
@ -761,7 +661,7 @@ public class DefaultDeviceStateServiceTest {
}
if (shouldUpdateActivityStateToActive) {
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request ->
request.getEntityId().equals(deviceId) &&
request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) &&
request.getEntries().get(0).getValue().equals(true)
@ -809,59 +709,8 @@ public class DefaultDeviceStateServiceTest {
);
}
@ParameterizedTest
@MethodSource("provideParametersForDecreaseInactivityTimeout")
public void givenTestParameters_whenOnDeviceInactivityTimeout_thenShouldBeInTheExpectedStateAndPerformExpectedActions(
boolean activityState, long newInactivityTimeout, long timeIncrement, boolean expectedActivityState
) throws Exception {
// GIVEN
long defaultInactivityTimeout = 10000;
initStateService(defaultInactivityTimeout);
var currentTime = new AtomicLong(System.currentTimeMillis());
DeviceState deviceState = DeviceState.builder()
.active(activityState)
.lastActivityTime(currentTime.get())
.inactivityTimeout(defaultInactivityTimeout)
.build();
DeviceStateData deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(deviceState)
.metaData(new TbMsgMetaData())
.build();
service.deviceStates.put(deviceId, deviceStateData);
service.getPartitionedEntities(tpi).add(deviceId);
given(service.getCurrentTimeMillis()).willReturn(currentTime.addAndGet(timeIncrement));
// WHEN
service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newInactivityTimeout);
// THEN
assertThat(deviceState.getInactivityTimeout()).isEqualTo(newInactivityTimeout);
assertThat(deviceState.isActive()).isEqualTo(expectedActivityState);
if (activityState && !expectedActivityState) {
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) &&
request.getEntries().get(0).getValue().equals(false)
));
}
}
private static Stream<Arguments> provideParametersForDecreaseInactivityTimeout() {
return Stream.of(
Arguments.of(true, 1, 0, true),
Arguments.of(true, 1, 1, false)
);
}
@Test
public void givenStateDataIsNull_whenUpdateInactivityTimeoutIfExpired_thenShouldCleanupDevice() {
void givenStateDataIsNull_whenUpdateInactivityTimeoutIfExpired_thenShouldCleanupDevice() {
// GIVEN
service.deviceStates.put(deviceId, deviceStateDataMock);
@ -875,7 +724,7 @@ public class DefaultDeviceStateServiceTest {
}
@Test
public void givenNotMyPartition_whenUpdateInactivityTimeoutIfExpired_thenShouldCleanupDevice() {
void givenNotMyPartition_whenUpdateInactivityTimeoutIfExpired_thenShouldCleanupDevice() {
// GIVEN
long currentTime = System.currentTimeMillis();
@ -911,7 +760,7 @@ public class DefaultDeviceStateServiceTest {
@ParameterizedTest
@MethodSource("provideParametersForUpdateInactivityStateIfExpired")
public void givenTestParameters_whenUpdateInactivityStateIfExpired_thenShouldBeInTheExpectedStateAndPerformExpectedActions(
void givenTestParameters_whenUpdateInactivityStateIfExpired_thenShouldBeInTheExpectedStateAndPerformExpectedActions(
boolean activityState, long ts, long lastActivityTime, long lastInactivityAlarmTime, long inactivityTimeout, long deviceCreationTime,
boolean expectedActivityState, long expectedLastInactivityAlarmTime, boolean shouldUpdateActivityStateToInactive
) {
@ -933,6 +782,7 @@ public class DefaultDeviceStateServiceTest {
if (shouldUpdateActivityStateToInactive) {
given(partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId)).willReturn(tpi);
mockSuccessfulSaveAttributes();
}
// WHEN
@ -943,7 +793,7 @@ public class DefaultDeviceStateServiceTest {
assertThat(state.getLastInactivityAlarmTime()).isEqualTo(expectedLastInactivityAlarmTime);
if (shouldUpdateActivityStateToInactive) {
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request ->
request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) &&
request.getEntries().get(0).getValue().equals(false)
));
@ -961,7 +811,7 @@ public class DefaultDeviceStateServiceTest {
assertThat(actualNotification.getDeviceId()).isEqualTo(deviceId);
assertThat(actualNotification.isActive()).isFalse();
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request ->
request.getTenantId().equals(tenantId) && request.getEntityId().equals(deviceId) &&
request.getScope().equals(AttributeScope.SERVER_SCOPE) &&
request.getEntries().get(0).getKey().equals(INACTIVITY_ALARM_TIME) &&
@ -1033,7 +883,79 @@ public class DefaultDeviceStateServiceTest {
}
@Test
public void givenConcurrentAccess_whenGetOrFetchDeviceStateData_thenFetchDeviceStateDataInvokedOnce() {
void givenInactiveDevice_whenActivityStatusChangesToActiveButFailedToSaveUpdatedActivityStatus_thenShouldNotUpdateCache2() {
// GIVEN
var deviceState = DeviceState.builder()
.active(false)
.lastActivityTime(100L)
.inactivityTimeout(50L)
.build();
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(deviceState)
.metaData(TbMsgMetaData.EMPTY)
.build();
service.deviceStates.put(deviceId, deviceStateData);
service.getPartitionedEntities(tpi).add(deviceId);
// WHEN-THEN
// simulating short DB outage
given(telemetrySubscriptionService.saveAttributesInternal(any())).willReturn(Futures.immediateFailedFuture(new RuntimeException("failed to save")));
doReturn(200L).when(service).getCurrentTimeMillis();
service.onDeviceActivity(tenantId, deviceId, 180L);
assertThat(deviceState.isActive()).isFalse(); // still inactive
// 10 millis pass... and new activity message it received
// this time DB save is successful
when(telemetrySubscriptionService.saveAttributesInternal(any())).thenReturn(Futures.immediateFuture(AttributesSaveResult.of(generateRandomVersions(1))));
doReturn(210L).when(service).getCurrentTimeMillis();
service.onDeviceActivity(tenantId, deviceId, 190L);
assertThat(deviceState.isActive()).isTrue();
}
@Test
void givenActiveDevice_whenActivityStatusChangesToInactiveButFailedToSaveUpdatedActivityStatus_thenShouldNotUpdateCache() {
// GIVEN
var deviceState = DeviceState.builder()
.active(true)
.lastActivityTime(100L)
.inactivityTimeout(50L)
.build();
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(deviceState)
.metaData(TbMsgMetaData.EMPTY)
.build();
service.deviceStates.put(deviceId, deviceStateData);
service.getPartitionedEntities(tpi).add(deviceId);
// WHEN-THEN (assuming periodic activity states check is done every 100 millis)
// simulating short DB outage
given(telemetrySubscriptionService.saveAttributesInternal(any())).willReturn(Futures.immediateFailedFuture(new RuntimeException("failed to save")));
doReturn(200L).when(service).getCurrentTimeMillis();
service.checkStates();
assertThat(deviceState.isActive()).isTrue(); // still active
// waiting 100 millis... periodic activity states check is triggered again
// this time DB save is successful
when(telemetrySubscriptionService.saveAttributesInternal(any())).thenReturn(Futures.immediateFuture(AttributesSaveResult.of(generateRandomVersions(1))));
doReturn(300L).when(service).getCurrentTimeMillis();
service.checkStates();
assertThat(deviceState.isActive()).isFalse();
}
@Test
void givenConcurrentAccess_whenGetOrFetchDeviceStateData_thenFetchDeviceStateDataInvokedOnce() {
doAnswer(invocation -> {
Thread.sleep(100);
return deviceStateDataMock;
@ -1069,10 +991,8 @@ public class DefaultDeviceStateServiceTest {
}
@Test
public void givenDeviceAdded_whenOnQueueMsg_thenShouldCacheAndSaveActivityToFalse() throws InterruptedException {
void givenDeviceAdded_whenOnQueueMsg_thenShouldCacheAndSaveActivityToFalse() {
// GIVEN
final long defaultTimeout = 1000;
initStateService(defaultTimeout);
given(deviceService.findDeviceById(any(TenantId.class), any(DeviceId.class))).willReturn(new Device(deviceId));
given(attributesService.find(any(TenantId.class), any(EntityId.class), any(AttributeScope.class), anyCollection())).willReturn(Futures.immediateFuture(Collections.emptyList()));
@ -1086,13 +1006,15 @@ public class DefaultDeviceStateServiceTest {
.setDeleted(false)
.build();
mockSuccessfulSaveAttributes();
// WHEN
service.onQueueMsg(proto, TbCallback.EMPTY);
// THEN
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(false);
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request ->
request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) &&
request.getEntries().get(0).getValue().equals(false)
));
@ -1100,14 +1022,12 @@ public class DefaultDeviceStateServiceTest {
}
@Test
public void givenDeviceActivityEventHappenedAfterAdded_whenOnDeviceActivity_thenShouldCacheAndSaveActivityToTrue() throws InterruptedException {
void givenDeviceActivityEventHappenedAfterAdded_whenOnDeviceActivity_thenShouldCacheAndSaveActivityToTrue() {
// GIVEN
final long defaultTimeout = 1000;
initStateService(defaultTimeout);
long currentTime = System.currentTimeMillis();
DeviceState deviceState = DeviceState.builder()
.active(false)
.inactivityTimeout(service.getDefaultInactivityTimeoutInSec())
.inactivityTimeout(defaultInactivityTimeoutMs)
.build();
DeviceStateData stateData = DeviceStateData.builder()
.tenantId(tenantId)
@ -1118,12 +1038,14 @@ public class DefaultDeviceStateServiceTest {
.build();
service.deviceStates.put(deviceId, stateData);
mockSuccessfulSaveAttributes();
// WHEN
service.onDeviceActivity(tenantId, deviceId, currentTime);
// THEN
ArgumentCaptor<AttributesSaveRequest> attributeRequestCaptor = ArgumentCaptor.forClass(AttributesSaveRequest.class);
then(telemetrySubscriptionService).should(times(2)).saveAttributes(attributeRequestCaptor.capture());
then(telemetrySubscriptionService).should(times(2)).saveAttributesInternal(attributeRequestCaptor.capture());
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(true);
@ -1151,15 +1073,14 @@ public class DefaultDeviceStateServiceTest {
}
@Test
public void givenDeviceActivityEventHappenedBeforeAdded_whenOnQueueMsg_thenShouldSaveActivityStateUsingValueFromCache() throws InterruptedException {
void givenDeviceActivityEventHappenedBeforeAdded_whenOnQueueMsg_thenShouldSaveActivityStateUsingValueFromCache() {
// GIVEN
final long defaultTimeout = 1000;
initStateService(defaultTimeout);
given(deviceService.findDeviceById(any(TenantId.class), any(DeviceId.class))).willReturn(new Device(deviceId));
given(attributesService.find(any(TenantId.class), any(EntityId.class), any(AttributeScope.class), anyCollection())).willReturn(Futures.immediateFuture(Collections.emptyList()));
long currentTime = System.currentTimeMillis();
DeviceState deviceState = DeviceState.builder()
var deviceState = DeviceState.builder()
.active(true)
.lastConnectTime(currentTime - 8000)
.lastActivityTime(currentTime - 4000)
@ -1167,16 +1088,20 @@ public class DefaultDeviceStateServiceTest {
.lastInactivityAlarmTime(0)
.inactivityTimeout(3000)
.build();
DeviceStateData stateData = DeviceStateData.builder()
var stateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.deviceCreationTime(currentTime - 10000)
.state(deviceState)
.build();
service.deviceStates.put(deviceId, stateData);
mockSuccessfulSaveAttributes();
// WHEN
TransportProtos.DeviceStateServiceMsgProto proto = TransportProtos.DeviceStateServiceMsgProto.newBuilder()
var proto = TransportProtos.DeviceStateServiceMsgProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
@ -1190,11 +1115,25 @@ public class DefaultDeviceStateServiceTest {
// THEN
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(true);
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
then(telemetrySubscriptionService).should().saveAttributesInternal(argThat(request ->
request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) &&
request.getEntries().get(0).getValue().equals(true)
));
});
}
private void mockSuccessfulSaveAttributes() {
lenient().when(telemetrySubscriptionService.saveAttributesInternal(any())).thenAnswer(invocation -> {
AttributesSaveRequest request = invocation.getArgument(0);
return Futures.immediateFuture(generateRandomVersions(request.getEntries().size()));
});
}
private static List<Long> generateRandomVersions(int n) {
return ThreadLocalRandom.current()
.longs(n)
.boxed()
.toList();
}
}

View File

@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
@ -472,7 +473,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(saveAttributes, sendWsUpdate, processCalculatedFields))
.build();
lenient().when(attrService.save(tenantId, entityId, request.getScope(), request.getEntries())).thenReturn(immediateFuture(listOfNNumbers(request.getEntries().size())));
lenient().when(attrService.save(tenantId, entityId, request.getScope(), request.getEntries()))
.thenReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size()))));
// WHEN
telemetryService.saveAttributes(request);
@ -547,7 +549,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build();
given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size())));
given(attrService.save(tenantId, deviceId, request.getScope(), entries))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(entries.size()))));
// WHEN
telemetryService.saveAttributes(request);
@ -581,7 +584,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build();
given(attrService.save(tenantId, nonDeviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size())));
given(attrService.save(tenantId, nonDeviceId, request.getScope(), entries))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(entries.size()))));
// WHEN
telemetryService.saveAttributes(request);
@ -613,7 +617,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build();
given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size())));
given(attrService.save(tenantId, deviceId, request.getScope(), entries))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(entries.size()))));
// WHEN
telemetryService.saveAttributes(request);
@ -640,7 +645,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build();
given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size())));
given(attrService.save(tenantId, deviceId, request.getScope(), entries))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(entries.size()))));
// WHEN
telemetryService.saveAttributes(request);
@ -715,7 +721,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build();
given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size())));
given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries()))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size()))));
// WHEN
telemetryService.saveAttributes(request);
@ -764,7 +771,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build();
given(attrService.save(tenantId, nonDeviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size())));
given(attrService.save(tenantId, nonDeviceId, request.getScope(), request.getEntries()))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size()))));
// WHEN
telemetryService.saveAttributes(request);
@ -792,7 +800,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build();
given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size())));
given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries()))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size()))));
// WHEN
telemetryService.saveAttributes(request);
@ -815,7 +824,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build();
given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size())));
given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries()))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size()))));
// WHEN
telemetryService.saveAttributes(request);
@ -843,7 +853,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build();
given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size())));
given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries()))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size()))));
// WHEN
telemetryService.saveAttributes(request);
@ -870,7 +881,8 @@ class DefaultTelemetrySubscriptionServiceTest {
.strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build();
given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries())).willReturn(immediateFuture(listOfNNumbers(request.getEntries().size())));
given(attrService.save(tenantId, deviceId, request.getScope(), request.getEntries()))
.willReturn(immediateFuture(AttributesSaveResult.of(listOfNNumbers(request.getEntries().size()))));
// WHEN
telemetryService.saveAttributes(request);

View File

@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import java.util.Collection;
import java.util.List;
@ -37,9 +38,9 @@ public interface AttributesService {
ListenableFuture<List<AttributeKvEntry>> findAll(TenantId tenantId, EntityId entityId, AttributeScope scope);
ListenableFuture<List<Long>> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes);
ListenableFuture<AttributesSaveResult> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes);
ListenableFuture<Long> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute);
ListenableFuture<AttributesSaveResult> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute);
ListenableFuture<List<String>> removeAll(TenantId tenantId, EntityId entityId, AttributeScope scope, List<String> attributeKeys);

View File

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.kv;
import java.util.Collections;
import java.util.List;
public record AttributesSaveResult(List<Long> versions) {
public static final AttributesSaveResult EMPTY = new AttributesSaveResult(Collections.emptyList());
public static AttributesSaveResult of(List<Long> versions) {
if (versions == null) {
return EMPTY;
}
return new AttributesSaveResult(versions);
}
}

View File

@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.edqs.EdqsService;
import org.thingsboard.server.dao.service.Validator;
@ -41,7 +42,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.thingsboard.server.dao.attributes.AttributeUtils.validate;
@ -101,26 +101,29 @@ public class BaseAttributesService implements AttributesService {
}
@Override
public ListenableFuture<Long> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
public ListenableFuture<AttributesSaveResult> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
validate(entityId, scope);
AttributeUtils.validate(attribute, valueNoXssValidation);
return doSave(tenantId, entityId, scope, attribute);
return doSave(tenantId, entityId, scope, List.of(attribute));
}
@Override
public ListenableFuture<List<Long>> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes) {
public ListenableFuture<AttributesSaveResult> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes) {
validate(entityId, scope);
AttributeUtils.validate(attributes, valueNoXssValidation);
List<ListenableFuture<Long>> saveFutures = attributes.stream().map(attribute -> doSave(tenantId, entityId, scope, attribute)).collect(Collectors.toList());
return Futures.allAsList(saveFutures);
return doSave(tenantId, entityId, scope, attributes);
}
private ListenableFuture<Long> doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
ListenableFuture<Long> future = attributesDao.save(tenantId, entityId, scope, attribute);
return Futures.transform(future, version -> {
edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attribute, version));
return version;
}, MoreExecutors.directExecutor());
private ListenableFuture<AttributesSaveResult> doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes) {
List<ListenableFuture<Long>> futures = new ArrayList<>(attributes.size());
for (AttributeKvEntry attribute : attributes) {
ListenableFuture<Long> future = Futures.transform(attributesDao.save(tenantId, entityId, scope, attribute), version -> {
edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attribute, version));
return version;
}, MoreExecutors.directExecutor());
futures.add(future);
}
return Futures.transform(Futures.allAsList(futures), AttributesSaveResult::of, MoreExecutors.directExecutor());
}
@Override

View File

@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.AttributesSaveResult;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.edqs.EdqsService;
@ -56,7 +57,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static org.thingsboard.server.dao.attributes.AttributeUtils.validate;
@ -150,7 +150,7 @@ public class CachedAttributesService implements AttributesService {
List<AttributeKvEntry> cachedAttributes = wrappedCachedAttributes.values().stream()
.map(TbCacheValueWrapper::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
.toList();
if (wrappedCachedAttributes.size() == attributeKeys.size()) {
log.trace("[{}][{}] Found all attributes from cache: {}", entityId, scope, attributeKeys);
return Futures.immediateFuture(cachedAttributes);
@ -159,8 +159,6 @@ public class CachedAttributesService implements AttributesService {
Set<String> notFoundAttributeKeys = new HashSet<>(attributeKeys);
notFoundAttributeKeys.removeAll(wrappedCachedAttributes.keySet());
List<AttributeCacheKey> notFoundKeys = notFoundAttributeKeys.stream().map(k -> new AttributeCacheKey(scope, entityId, k)).collect(Collectors.toList());
// DB call should run in DB executor, not in cache-related executor
return jpaExecutorService.submit(() -> {
log.trace("[{}][{}] Lookup attributes from db: {}", entityId, scope, notFoundAttributeKeys);
@ -222,33 +220,31 @@ public class CachedAttributesService implements AttributesService {
}
@Override
public ListenableFuture<Long> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
public ListenableFuture<AttributesSaveResult> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
validate(entityId, scope);
AttributeUtils.validate(attribute, valueNoXssValidation);
return doSave(tenantId, entityId, scope, attribute);
return doSave(tenantId, entityId, scope, List.of(attribute));
}
@Override
public ListenableFuture<List<Long>> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes) {
public ListenableFuture<AttributesSaveResult> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes) {
validate(entityId, scope);
AttributeUtils.validate(attributes, valueNoXssValidation);
List<ListenableFuture<Long>> futures = new ArrayList<>(attributes.size());
for (var attribute : attributes) {
futures.add(doSave(tenantId, entityId, scope, attribute));
}
return Futures.allAsList(futures);
return doSave(tenantId, entityId, scope, attributes);
}
private ListenableFuture<Long> doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
ListenableFuture<Long> future = attributesDao.save(tenantId, entityId, scope, attribute);
return Futures.transform(future, version -> {
BaseAttributeKvEntry attributeKvEntry = new BaseAttributeKvEntry(((BaseAttributeKvEntry) attribute).getKv(), attribute.getLastUpdateTs(), version);
put(entityId, scope, attributeKvEntry);
edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attributeKvEntry, version));
return version;
}, cacheExecutor);
private ListenableFuture<AttributesSaveResult> doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes) {
List<ListenableFuture<Long>> futures = new ArrayList<>(attributes.size());
for (var attribute : attributes) {
ListenableFuture<Long> future = Futures.transform(attributesDao.save(tenantId, entityId, scope, attribute), version -> {
BaseAttributeKvEntry attributeKvEntry = new BaseAttributeKvEntry(((BaseAttributeKvEntry) attribute).getKv(), attribute.getLastUpdateTs(), version);
put(entityId, scope, attributeKvEntry);
edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attributeKvEntry, version));
return version;
}, cacheExecutor);
futures.add(future);
}
return Futures.transform(Futures.allAsList(futures), AttributesSaveResult::of, MoreExecutors.directExecutor());
}
private void put(EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
@ -270,7 +266,7 @@ public class CachedAttributesService implements AttributesService {
edqsService.onDelete(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, key, version));
}
return key;
}, cacheExecutor)).collect(Collectors.toList()));
}, cacheExecutor)).toList());
}
@Override