This commit is contained in:
Andrii Shvaika 2025-03-06 16:55:30 +02:00
commit 3a62f47edb
27 changed files with 452 additions and 346 deletions

View File

@ -33,7 +33,7 @@ import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.NotificationCenter;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.SmsService;
import org.thingsboard.rule.engine.api.notification.SlackService;
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
@ -233,7 +233,7 @@ public class ActorSystemContext {
@Autowired(required = false)
@Getter
private RuleEngineDeviceStateManager deviceStateManager;
private DeviceStateManager deviceStateManager;
@Autowired
@Getter

View File

@ -29,7 +29,7 @@ import org.thingsboard.rule.engine.api.RuleEngineAlarmService;
import org.thingsboard.rule.engine.api.RuleEngineApiUsageStateService;
import org.thingsboard.rule.engine.api.RuleEngineAssetProfileCache;
import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.RuleEngineRpcService;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.ScriptEngine;
@ -725,7 +725,7 @@ public class DefaultTbContext implements TbContext {
}
@Override
public RuleEngineDeviceStateManager getDeviceStateManager() {
public DeviceStateManager getDeviceStateManager() {
return mainCtx.getDeviceStateManager();
}

View File

@ -152,8 +152,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
entityData = new HashMap<>();
attributes = JacksonUtil.newObjectNode();
for (AttributeKvEntry attr : ssAttributes) {
if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(attr.getKey())
&& !DefaultDeviceStateService.INACTIVITY_TIMEOUT.equals(attr.getKey())) {
if (DefaultDeviceStateService.ACTIVITY_KEYS_WITHOUT_INACTIVITY_TIMEOUT.contains(attr.getKey())) {
continue;
}
if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) {
@ -200,7 +199,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
}
Map<Long, Map<String, Object>> tsData = new HashMap<>();
for (TsKvEntry tsKvEntry : tsKvEntries) {
if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(tsKvEntry.getKey())) {
if (DefaultDeviceStateService.ACTIVITY_KEYS_WITH_INACTIVITY_TIMEOUT.contains(tsKvEntry.getKey())) {
continue;
}
tsData.computeIfAbsent(tsKvEntry.getTs(), k -> new HashMap<>()).put(tsKvEntry.getKey(), tsKvEntry.getValue());

View File

@ -293,6 +293,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
} else if (toCoreMsg.hasDeviceInactivityMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceInactivityMsg());
forwardToStateService(toCoreMsg.getDeviceInactivityMsg(), callback);
} else if (toCoreMsg.hasDeviceInactivityTimeoutUpdateMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceInactivityTimeoutUpdateMsg());
forwardToStateService(toCoreMsg.getDeviceInactivityTimeoutUpdateMsg(), callback);
} else if (toCoreMsg.hasToDeviceActorNotification()) {
TbActorMsg actorMsg = ProtoUtils.fromProto(toCoreMsg.getToDeviceActorNotification());
if (actorMsg != null) {
@ -658,6 +661,21 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
});
}
void forwardToStateService(TransportProtos.DeviceInactivityTimeoutUpdateProto deviceInactivityTimeoutUpdateMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceInactivityTimeoutUpdateMsg);
}
var tenantId = toTenantId(deviceInactivityTimeoutUpdateMsg.getTenantIdMSB(), deviceInactivityTimeoutUpdateMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceInactivityTimeoutUpdateMsg.getDeviceIdMSB(), deviceInactivityTimeoutUpdateMsg.getDeviceIdLSB()));
ListenableFuture<?> future = deviceActivityEventsExecutor.submit(() -> stateService.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, deviceInactivityTimeoutUpdateMsg.getInactivityTimeout()));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process device inactivity timeout update message for device [{}]", tenantId.getId(), deviceId.getId(), t);
callback.onFailure(t);
});
}
private void forwardToNotificationSchedulerService(TransportProtos.NotificationSchedulerServiceMsg msg, TbCallback callback) {
TenantId tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
NotificationRequestId notificationRequestId = new NotificationRequestId(new UUID(msg.getRequestIdMSB(), msg.getRequestIdLSB()));

View File

@ -40,6 +40,7 @@ public class TbCoreConsumerStats {
public static final String DEVICE_ACTIVITIES = "deviceActivity";
public static final String DEVICE_DISCONNECTS = "deviceDisconnect";
public static final String DEVICE_INACTIVITIES = "deviceInactivity";
public static final String DEVICE_INACTIVITY_TIMEOUT_UPDATES = "deviceInactivityTimeoutUpdate";
public static final String TO_CORE_NF_OTHER = "coreNfOther"; // normally, there is no messages when codebase is fine
public static final String TO_CORE_NF_COMPONENT_LIFECYCLE = "coreNfCompLfcl";
@ -65,6 +66,7 @@ public class TbCoreConsumerStats {
private final StatsCounter deviceActivitiesCounter;
private final StatsCounter deviceDisconnectsCounter;
private final StatsCounter deviceInactivitiesCounter;
private final StatsCounter deviceInactivityTimeoutUpdatesCounter;
private final StatsCounter toCoreNfOtherCounter;
private final StatsCounter toCoreNfComponentLifecycleCounter;
@ -95,6 +97,7 @@ public class TbCoreConsumerStats {
this.deviceActivitiesCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_ACTIVITIES));
this.deviceDisconnectsCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_DISCONNECTS));
this.deviceInactivitiesCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_INACTIVITIES));
this.deviceInactivityTimeoutUpdatesCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_INACTIVITY_TIMEOUT_UPDATES));
// Core notification counters
this.toCoreNfOtherCounter = register(statsFactory.createStatsCounter(statsKey, TO_CORE_NF_OTHER));
@ -163,6 +166,11 @@ public class TbCoreConsumerStats {
deviceInactivitiesCounter.increment();
}
public void log(TransportProtos.DeviceInactivityTimeoutUpdateProto msg) {
totalCounter.increment();
deviceInactivityTimeoutUpdatesCounter.increment();
}
public void log(TransportProtos.SubscriptionMgrMsgProto msg) {
totalCounter.increment();
subscriptionMsgCounter.increment();

View File

@ -0,0 +1,180 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.state;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.common.SimpleTbQueueCallback;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Supplier;
@Slf4j
@Service
@RequiredArgsConstructor
public class DefaultDeviceStateManager implements DeviceStateManager {
private final TbServiceInfoProvider serviceInfoProvider;
private final PartitionService partitionService;
private final Optional<DeviceStateService> deviceStateService;
private final TbClusterService clusterService;
@Override
public void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long connectTime, TbCallback callback) {
forwardToDeviceStateService(tenantId, deviceId,
deviceStateService -> {
log.debug("[{}][{}] Forwarding device connect event to local service. Connect time: [{}].", tenantId.getId(), deviceId.getId(), connectTime);
deviceStateService.onDeviceConnect(tenantId, deviceId, connectTime);
},
() -> {
log.debug("[{}][{}] Sending device connect message to core. Connect time: [{}].", tenantId.getId(), deviceId.getId(), connectTime);
var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastConnectTime(connectTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceConnectMsg(deviceConnectMsg)
.build();
}, callback);
}
@Override
public void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long activityTime, TbCallback callback) {
forwardToDeviceStateService(tenantId, deviceId,
deviceStateService -> {
log.debug("[{}][{}] Forwarding device activity event to local service. Activity time: [{}].", tenantId.getId(), deviceId.getId(), activityTime);
deviceStateService.onDeviceActivity(tenantId, deviceId, activityTime);
},
() -> {
log.debug("[{}][{}] Sending device activity message to core. Activity time: [{}].", tenantId.getId(), deviceId.getId(), activityTime);
var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastActivityTime(activityTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceActivityMsg(deviceActivityMsg)
.build();
}, callback);
}
@Override
public void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId, long disconnectTime, TbCallback callback) {
forwardToDeviceStateService(tenantId, deviceId,
deviceStateService -> {
log.debug("[{}][{}] Forwarding device disconnect event to local service. Disconnect time: [{}].", tenantId.getId(), deviceId.getId(), disconnectTime);
deviceStateService.onDeviceDisconnect(tenantId, deviceId, disconnectTime);
},
() -> {
log.debug("[{}][{}] Sending device disconnect message to core. Disconnect time: [{}].", tenantId.getId(), deviceId.getId(), disconnectTime);
var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastDisconnectTime(disconnectTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceDisconnectMsg(deviceDisconnectMsg)
.build();
}, callback);
}
@Override
public void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long inactivityTime, TbCallback callback) {
forwardToDeviceStateService(tenantId, deviceId,
deviceStateService -> {
log.debug("[{}][{}] Forwarding device inactivity event to local service. Inactivity time: [{}].", tenantId.getId(), deviceId.getId(), inactivityTime);
deviceStateService.onDeviceInactivity(tenantId, deviceId, inactivityTime);
},
() -> {
log.debug("[{}][{}] Sending device inactivity message to core. Inactivity time: [{}].", tenantId.getId(), deviceId.getId(), inactivityTime);
var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastInactivityTime(inactivityTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceInactivityMsg(deviceInactivityMsg)
.build();
}, callback);
}
@Override
public void onDeviceInactivityTimeoutUpdate(TenantId tenantId, DeviceId deviceId, long inactivityTimeout, TbCallback callback) {
forwardToDeviceStateService(tenantId, deviceId,
deviceStateService -> {
log.debug("[{}][{}] Forwarding device inactivity timeout update to local service. Updated inactivity timeout: [{}].", tenantId.getId(), deviceId.getId(), inactivityTimeout);
deviceStateService.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, inactivityTimeout);
},
() -> {
log.debug("[{}][{}] Sending device inactivity timeout update message to core. Updated inactivity timeout: [{}].", tenantId.getId(), deviceId.getId(), inactivityTimeout);
var deviceInactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setInactivityTimeout(inactivityTimeout)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceInactivityTimeoutUpdateMsg(deviceInactivityTimeoutUpdateMsg)
.build();
}, callback);
}
private void forwardToDeviceStateService(
TenantId tenantId, DeviceId deviceId,
Consumer<DeviceStateService> toDeviceStateService,
Supplier<TransportProtos.ToCoreMsg> toCore,
TbCallback callback
) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
if (serviceInfoProvider.isService(ServiceType.TB_CORE) && tpi.isMyPartition() && deviceStateService.isPresent()) {
try {
toDeviceStateService.accept(deviceStateService.get());
} catch (Exception e) {
log.error("[{}][{}] Failed to process device connectivity event.", tenantId.getId(), deviceId.getId(), e);
callback.onFailure(e);
return;
}
callback.onSuccess();
} else {
TransportProtos.ToCoreMsg toCoreMsg = toCore.get();
clusterService.pushMsgToCore(tpi, deviceId.getId(), toCoreMsg, new SimpleTbQueueCallback(__ -> callback.onSuccess(), callback::onFailure));
}
}
}

View File

@ -96,6 +96,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
@ -129,11 +130,10 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
private static final List<EntityKey> PERSISTENT_TELEMETRY_KEYS = Arrays.asList(
new EntityKey(EntityKeyType.TIME_SERIES, LAST_ACTIVITY_TIME),
new EntityKey(EntityKeyType.TIME_SERIES, INACTIVITY_ALARM_TIME),
new EntityKey(EntityKeyType.TIME_SERIES, INACTIVITY_TIMEOUT),
new EntityKey(EntityKeyType.TIME_SERIES, ACTIVITY_STATE),
new EntityKey(EntityKeyType.TIME_SERIES, LAST_CONNECT_TIME),
new EntityKey(EntityKeyType.TIME_SERIES, LAST_DISCONNECT_TIME),
new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT));
new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT)); // inactivity timeout is always a server attribute, even when activity data is stored as time series
private static final List<EntityKey> PERSISTENT_ATTRIBUTE_KEYS = Arrays.asList(
new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, LAST_ACTIVITY_TIME),
@ -143,8 +143,14 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, LAST_CONNECT_TIME),
new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, LAST_DISCONNECT_TIME));
public static final List<String> PERSISTENT_ATTRIBUTES = Arrays.asList(ACTIVITY_STATE, LAST_CONNECT_TIME,
LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT);
public static final Set<String> ACTIVITY_KEYS_WITHOUT_INACTIVITY_TIMEOUT = Set.of(
ACTIVITY_STATE, LAST_CONNECT_TIME, LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME
);
public static final Set<String> ACTIVITY_KEYS_WITH_INACTIVITY_TIMEOUT = Set.of(
ACTIVITY_STATE, LAST_CONNECT_TIME, LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT
);
private static final List<EntityKey> PERSISTENT_ENTITY_FIELDS = Arrays.asList(
new EntityKey(EntityKeyType.ENTITY_FIELD, "name"),
new EntityKey(EntityKeyType.ENTITY_FIELD, "type"),
@ -643,41 +649,45 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
deviceStates.remove(deviceId);
}
private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) {
ListenableFuture<DeviceStateData> future;
if (persistToTelemetry) {
ListenableFuture<List<TsKvEntry>> tsData = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), PERSISTENT_ATTRIBUTES);
future = Futures.transform(tsData, extractDeviceStateData(device), MoreExecutors.directExecutor());
ListenableFuture<List<TsKvEntry>> timeseriesActivityDataFuture = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), ACTIVITY_KEYS_WITHOUT_INACTIVITY_TIMEOUT);
ListenableFuture<Optional<AttributeKvEntry>> inactivityTimeoutAttributeFuture = attributesService.find(
TenantId.SYS_TENANT_ID, device.getId(), AttributeScope.SERVER_SCOPE, INACTIVITY_TIMEOUT
);
ListenableFuture<List<? extends KvEntry>> fullActivityDataFuture = Futures.whenAllSucceed(timeseriesActivityDataFuture, inactivityTimeoutAttributeFuture).call(() -> {
List<TsKvEntry> activityTimeseries = Futures.getDone(timeseriesActivityDataFuture);
Optional<AttributeKvEntry> inactivityTimeoutAttribute = Futures.getDone(inactivityTimeoutAttributeFuture);
List<KvEntry> result;
if (inactivityTimeoutAttribute.isPresent()) {
result = new ArrayList<>(activityTimeseries.size() + 1);
result.addAll(activityTimeseries);
inactivityTimeoutAttribute.ifPresent(result::add);
} else {
return activityTimeseries;
}
return result;
}, deviceStateCallbackExecutor);
future = Futures.transform(fullActivityDataFuture, extractDeviceStateData(device), MoreExecutors.directExecutor());
} else {
ListenableFuture<List<AttributeKvEntry>> attrData = attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), AttributeScope.SERVER_SCOPE, PERSISTENT_ATTRIBUTES);
future = Futures.transform(attrData, extractDeviceStateData(device), MoreExecutors.directExecutor());
ListenableFuture<List<AttributeKvEntry>> attributesActivityDataFuture = attributesService.find(
TenantId.SYS_TENANT_ID, device.getId(), AttributeScope.SERVER_SCOPE, ACTIVITY_KEYS_WITH_INACTIVITY_TIMEOUT
);
future = Futures.transform(attributesActivityDataFuture, extractDeviceStateData(device), MoreExecutors.directExecutor());
}
return transformInactivityTimeout(future);
return future;
}
private ListenableFuture<DeviceStateData> transformInactivityTimeout(ListenableFuture<DeviceStateData> future) {
return Futures.transformAsync(future, deviceStateData -> {
if (!persistToTelemetry || deviceStateData.getState().getInactivityTimeout() != defaultInactivityTimeoutMs) {
return future; //fail fast
}
var attributesFuture = attributesService.find(TenantId.SYS_TENANT_ID, deviceStateData.getDeviceId(), AttributeScope.SERVER_SCOPE, INACTIVITY_TIMEOUT);
return Futures.transform(attributesFuture, attributes -> {
attributes.flatMap(KvEntry::getLongValue).ifPresent((inactivityTimeout) -> {
if (inactivityTimeout > 0) {
deviceStateData.getState().setInactivityTimeout(inactivityTimeout);
}
});
return deviceStateData;
}, MoreExecutors.directExecutor());
}, deviceStateCallbackExecutor);
}
private <T extends KvEntry> Function<List<T>, DeviceStateData> extractDeviceStateData(Device device) {
private Function<List<? extends KvEntry>, DeviceStateData> extractDeviceStateData(Device device) {
return new Function<>() {
@Nonnull
@Override
public DeviceStateData apply(@Nullable List<T> data) {
public DeviceStateData apply(@Nullable List<? extends KvEntry> data) {
try {
long lastActivityTime = getEntryValue(data, LAST_ACTIVITY_TIME, 0L);
long inactivityAlarmTime = getEntryValue(data, INACTIVITY_ALARM_TIME, 0L);
@ -690,7 +700,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
.lastDisconnectTime(getEntryValue(data, LAST_DISCONNECT_TIME, 0L))
.lastActivityTime(lastActivityTime)
.lastInactivityAlarmTime(inactivityAlarmTime)
.inactivityTimeout(inactivityTimeout)
.inactivityTimeout(inactivityTimeout > 0 ? inactivityTimeout : defaultInactivityTimeoutMs)
.build();
TbMsgMetaData md = new TbMsgMetaData();
md.putValue("deviceName", device.getName());
@ -761,12 +771,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
DeviceStateData toDeviceStateData(EntityData ed, DeviceIdInfo deviceIdInfo) {
long lastActivityTime = getEntryValue(ed, getKeyType(), LAST_ACTIVITY_TIME, 0L);
long inactivityAlarmTime = getEntryValue(ed, getKeyType(), INACTIVITY_ALARM_TIME, 0L);
long inactivityTimeout = getEntryValue(ed, getKeyType(), INACTIVITY_TIMEOUT, defaultInactivityTimeoutMs);
if (persistToTelemetry && inactivityTimeout == defaultInactivityTimeoutMs) {
log.trace("[{}] default value for inactivity timeout fetched {}, going to fetch inactivity timeout from attributes",
deviceIdInfo.getDeviceId(), inactivityTimeout);
inactivityTimeout = getEntryValue(ed, EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT, defaultInactivityTimeoutMs);
}
long inactivityTimeout = getEntryValue(ed, EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT, defaultInactivityTimeoutMs);
// Actual active state by wall-clock will be updated outside this method. This method is only for fetching persistent state
final boolean active = getEntryValue(ed, getKeyType(), ACTIVITY_STATE, false);
DeviceState deviceState = DeviceState.builder()

View File

@ -1,196 +0,0 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.state;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.common.SimpleTbQueueCallback;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import java.util.Optional;
import java.util.UUID;
@Slf4j
@Service
@TbRuleEngineComponent
public class DefaultRuleEngineDeviceStateManager implements RuleEngineDeviceStateManager {
private final TbServiceInfoProvider serviceInfoProvider;
private final PartitionService partitionService;
private final Optional<DeviceStateService> deviceStateService;
private final TbClusterService clusterService;
public DefaultRuleEngineDeviceStateManager(
TbServiceInfoProvider serviceInfoProvider, PartitionService partitionService,
Optional<DeviceStateService> deviceStateServiceOptional, TbClusterService clusterService
) {
this.serviceInfoProvider = serviceInfoProvider;
this.partitionService = partitionService;
this.deviceStateService = deviceStateServiceOptional;
this.clusterService = clusterService;
}
@Getter
private abstract static class ConnectivityEventInfo {
private final TenantId tenantId;
private final DeviceId deviceId;
private final long eventTime;
private ConnectivityEventInfo(TenantId tenantId, DeviceId deviceId, long eventTime) {
this.tenantId = tenantId;
this.deviceId = deviceId;
this.eventTime = eventTime;
}
abstract void forwardToLocalService();
abstract TransportProtos.ToCoreMsg toQueueMsg();
}
@Override
public void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long connectTime, TbCallback callback) {
routeEvent(new ConnectivityEventInfo(tenantId, deviceId, connectTime) {
@Override
void forwardToLocalService() {
deviceStateService.ifPresent(service -> service.onDeviceConnect(tenantId, deviceId, connectTime));
}
@Override
TransportProtos.ToCoreMsg toQueueMsg() {
var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastConnectTime(connectTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceConnectMsg(deviceConnectMsg)
.build();
}
}, callback);
}
@Override
public void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long activityTime, TbCallback callback) {
routeEvent(new ConnectivityEventInfo(tenantId, deviceId, activityTime) {
@Override
void forwardToLocalService() {
deviceStateService.ifPresent(service -> service.onDeviceActivity(tenantId, deviceId, activityTime));
}
@Override
TransportProtos.ToCoreMsg toQueueMsg() {
var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastActivityTime(activityTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceActivityMsg(deviceActivityMsg)
.build();
}
}, callback);
}
@Override
public void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId, long disconnectTime, TbCallback callback) {
routeEvent(new ConnectivityEventInfo(tenantId, deviceId, disconnectTime) {
@Override
void forwardToLocalService() {
deviceStateService.ifPresent(service -> service.onDeviceDisconnect(tenantId, deviceId, disconnectTime));
}
@Override
TransportProtos.ToCoreMsg toQueueMsg() {
var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastDisconnectTime(disconnectTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceDisconnectMsg(deviceDisconnectMsg)
.build();
}
}, callback);
}
@Override
public void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long inactivityTime, TbCallback callback) {
routeEvent(new ConnectivityEventInfo(tenantId, deviceId, inactivityTime) {
@Override
void forwardToLocalService() {
deviceStateService.ifPresent(service -> service.onDeviceInactivity(tenantId, deviceId, inactivityTime));
}
@Override
TransportProtos.ToCoreMsg toQueueMsg() {
var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastInactivityTime(inactivityTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceInactivityMsg(deviceInactivityMsg)
.build();
}
}, callback);
}
private void routeEvent(ConnectivityEventInfo eventInfo, TbCallback callback) {
var tenantId = eventInfo.getTenantId();
var deviceId = eventInfo.getDeviceId();
long eventTime = eventInfo.getEventTime();
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
if (serviceInfoProvider.isService(ServiceType.TB_CORE) && tpi.isMyPartition() && deviceStateService.isPresent()) {
log.debug("[{}][{}] Forwarding device connectivity event to local service. Event time: [{}].", tenantId.getId(), deviceId.getId(), eventTime);
try {
eventInfo.forwardToLocalService();
} catch (Exception e) {
log.error("[{}][{}] Failed to process device connectivity event. Event time: [{}].", tenantId.getId(), deviceId.getId(), eventTime, e);
callback.onFailure(e);
return;
}
callback.onSuccess();
} else {
TransportProtos.ToCoreMsg msg = eventInfo.toQueueMsg();
log.debug("[{}][{}] Sending device connectivity message to core. Event time: [{}].", tenantId.getId(), deviceId.getId(), eventTime);
clusterService.pushMsgToCore(tpi, UUID.randomUUID(), msg, new SimpleTbQueueCallback(__ -> callback.onSuccess(), callback::onFailure));
}
}
}

View File

@ -161,9 +161,6 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
@Override
public void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, TbCallback callback) {
onTimeSeriesUpdate(entityId, ts);
if (entityId.getEntityType() == EntityType.DEVICE) {
updateDeviceInactivityTimeout(tenantId, entityId, ts);
}
callback.onSuccess();
}
@ -171,9 +168,6 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
public void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys, TbCallback callback) {
onTimeSeriesUpdate(entityId,
keys.stream().map(key -> new BasicTsKvEntry(0, new StringDataEntry(key, ""))).collect(Collectors.toList()));
if (entityId.getEntityType() == EntityType.DEVICE) {
deleteDeviceInactivityTimeout(tenantId, entityId, keys);
}
callback.onSuccess();
}

View File

@ -259,8 +259,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
.strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS)
.callback(new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void tmp) {
}
public void onSuccess(@Nullable Void tmp) {}
@Override
public void onFailure(Throwable t) {
@ -324,6 +323,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
addMainCallback(saveFuture, result -> callback.onSuccess(null), callback::onFailure);
}
private <S> void addMainCallback(ListenableFuture<S> saveFuture, Consumer<S> onSuccess) {
addMainCallback(saveFuture, onSuccess, null);
}
private <S> void addMainCallback(ListenableFuture<S> saveFuture, Consumer<S> onSuccess, Consumer<Throwable> onFailure) {
DonAsynchron.withCallback(saveFuture, onSuccess, onFailure, tsCallBackExecutor);
}
@ -345,13 +348,12 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}
@Override
public void onFailure(Throwable t) {
}
public void onFailure(Throwable t) {}
};
}
private FutureCallback<Void> getCalculatedFieldCallback(FutureCallback<List<String>> originalCallback, List<String> keys) {
return new FutureCallback<Void>() {
return new FutureCallback<>() {
@Override
public void onSuccess(Void unused) {
originalCallback.onSuccess(keys);

View File

@ -532,6 +532,98 @@ public class DefaultTbCoreConsumerServiceTest {
then(statsMock).should(never()).log(inactivityMsg);
}
@Test
public void givenProcessingSuccess_whenForwardingInactivityTimeoutUpdateMsgToStateService_thenOnSuccessCallbackIsCalled() {
// GIVEN
var inactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setInactivityTimeout(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock);
// THEN
then(stateServiceMock).should().onDeviceInactivityTimeoutUpdate(tenantId, deviceId, time);
then(tbCallbackMock).should().onSuccess();
then(tbCallbackMock).should(never()).onFailure(any());
}
@Test
public void givenProcessingFailure_whenForwardingInactivityTimeoutUpdateMsgToStateService_thenOnFailureCallbackIsCalled() {
// GIVEN
var inactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setInactivityTimeout(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock);
var runtimeException = new RuntimeException("Something bad happened!");
doThrow(runtimeException).when(stateServiceMock).onDeviceInactivityTimeoutUpdate(tenantId, deviceId, time);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock);
// THEN
then(tbCallbackMock).should(never()).onSuccess();
then(tbCallbackMock).should().onFailure(runtimeException);
}
@Test
public void givenStatsEnabled_whenForwardingInactivityTimeoutUpdateMsgToStateService_thenStatsAreRecorded() {
// GIVEN
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", true);
var inactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setInactivityTimeout(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock);
// THEN
then(statsMock).should().log(inactivityTimeoutUpdateMsg);
}
@Test
public void givenStatsDisabled_whenForwardingInactivityTimeoutUpdateMsgToStateService_thenStatsAreNotRecorded() {
// GIVEN
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", false);
var inactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setInactivityTimeout(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock);
// THEN
then(statsMock).should(never()).log(inactivityTimeoutUpdateMsg);
}
@Test
public void givenRestApiCallResponseMsgProto_whenForwardToRuleEngineCallService_thenCallOnQueueMsg() {
// GIVEN
@ -545,4 +637,5 @@ public class DefaultTbCoreConsumerServiceTest {
// THEN
then(ruleEngineCallServiceMock).should().onQueueMsg(restApiCallResponseMsgProto, tbCallbackMock);
}
}

View File

@ -52,7 +52,7 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
@ExtendWith(MockitoExtension.class)
public class DefaultRuleEngineDeviceStateManagerTest {
public class DefaultDeviceStateManagerTest {
@Mock
private DeviceStateService deviceStateServiceMock;
@ -71,7 +71,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
@Captor
private ArgumentCaptor<TbQueueCallback> queueCallbackCaptor;
private DefaultRuleEngineDeviceStateManager deviceStateManager;
private DefaultDeviceStateManager deviceStateManager;
private static final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("57ab2e6c-bc4c-11ee-a506-0242ac120002"));
private static final DeviceId DEVICE_ID = DeviceId.fromString("74a9053e-bc4c-11ee-a506-0242ac120002");
@ -82,7 +82,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
@BeforeEach
public void setup() {
deviceStateManager = new DefaultRuleEngineDeviceStateManager(serviceInfoProviderMock, partitionServiceMock, Optional.of(deviceStateServiceMock), clusterServiceMock);
deviceStateManager = new DefaultDeviceStateManager(serviceInfoProviderMock, partitionServiceMock, Optional.of(deviceStateServiceMock), clusterServiceMock);
}
@ParameterizedTest
@ -90,7 +90,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
"when onDeviceX() is called, then should route event to local service and call onSuccess() callback.")
@MethodSource
public void givenRoutedToLocalAndProcessingSuccess_whenOnDeviceAction_thenShouldCallLocalServiceAndSuccessCallback(
BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback> onDeviceAction, Consumer<DeviceStateService> actionVerification
BiConsumer<DefaultDeviceStateManager, TbCallback> onDeviceAction, Consumer<DeviceStateService> actionVerification
) {
// GIVEN
given(serviceInfoProviderMock.isService(ServiceType.TB_CORE)).willReturn(true);
@ -109,20 +109,24 @@ public class DefaultRuleEngineDeviceStateManagerTest {
private static Stream<Arguments> givenRoutedToLocalAndProcessingSuccess_whenOnDeviceAction_thenShouldCallLocalServiceAndSuccessCallback() {
return Stream.of(
Arguments.of(
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS)
)
);
}
@ -132,7 +136,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
"when onDeviceX() is called, then should route event to local service and call onFailure() callback.")
@MethodSource
public void givenRoutedToLocalAndProcessingFailure_whenOnDeviceAction_thenShouldCallLocalServiceAndFailureCallback(
Consumer<DeviceStateService> exceptionThrowSetup, BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback> onDeviceAction, Consumer<DeviceStateService> actionVerification
Consumer<DeviceStateService> exceptionThrowSetup, BiConsumer<DefaultDeviceStateManager, TbCallback> onDeviceAction, Consumer<DeviceStateService> actionVerification
) {
// GIVEN
given(serviceInfoProviderMock.isService(ServiceType.TB_CORE)).willReturn(true);
@ -155,23 +159,28 @@ public class DefaultRuleEngineDeviceStateManagerTest {
return Stream.of(
Arguments.of(
(Consumer<DeviceStateService>) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS),
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Consumer<DeviceStateService>) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS),
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Consumer<DeviceStateService>) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS),
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Consumer<DeviceStateService>) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS),
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Consumer<DeviceStateService>) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS)
)
);
}
@ -181,7 +190,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
"when onDeviceX() is called, then should send correct queue message to external service with correct callback object.")
@MethodSource
public void givenRoutedToExternal_whenOnDeviceAction_thenShouldSendQueueMsgToExternalServiceWithCorrectCallback(
BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback> onDeviceAction, BiConsumer<TbClusterService, ArgumentCaptor<TbQueueCallback>> actionVerification
BiConsumer<DefaultDeviceStateManager, TbCallback> onDeviceAction, BiConsumer<TbClusterService, ArgumentCaptor<TbQueueCallback>> actionVerification
) {
// WHEN
ReflectionTestUtils.setField(deviceStateManager, "deviceStateService", Optional.empty());
@ -203,7 +212,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
private static Stream<Arguments> givenRoutedToExternal_whenOnDeviceAction_thenShouldSendQueueMsgToExternalServiceWithCorrectCallback() {
return Stream.of(
Arguments.of(
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<TbClusterService, ArgumentCaptor<TbQueueCallback>>) (clusterServiceMock, queueCallbackCaptor) -> {
var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
@ -219,7 +228,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
}
),
Arguments.of(
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<TbClusterService, ArgumentCaptor<TbQueueCallback>>) (clusterServiceMock, queueCallbackCaptor) -> {
var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
@ -235,7 +244,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
}
),
Arguments.of(
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<TbClusterService, ArgumentCaptor<TbQueueCallback>>) (clusterServiceMock, queueCallbackCaptor) -> {
var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
@ -251,7 +260,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
}
),
Arguments.of(
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<TbClusterService, ArgumentCaptor<TbQueueCallback>>) (clusterServiceMock, queueCallbackCaptor) -> {
var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
@ -265,6 +274,22 @@ public class DefaultRuleEngineDeviceStateManagerTest {
.build();
then(clusterServiceMock).should().pushMsgToCore(eq(EXTERNAL_TPI), any(UUID.class), eq(toCoreMsg), queueCallbackCaptor.capture());
}
),
Arguments.of(
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<TbClusterService, ArgumentCaptor<TbQueueCallback>>) (clusterServiceMock, queueCallbackCaptor) -> {
var deviceInactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits())
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits())
.setInactivityTimeout(EVENT_TS)
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceInactivityTimeoutUpdateMsg(deviceInactivityTimeoutUpdateMsg)
.build();
then(clusterServiceMock).should().pushMsgToCore(eq(EXTERNAL_TPI), any(UUID.class), eq(toCoreMsg), queueCallbackCaptor.capture());
}
)
);
}

View File

@ -38,9 +38,6 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivityTrigger;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.TsValue;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
@ -88,7 +85,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.thingsboard.server.service.state.DefaultDeviceStateService.ACTIVITY_STATE;
import static org.thingsboard.server.service.state.DefaultDeviceStateService.INACTIVITY_ALARM_TIME;
import static org.thingsboard.server.service.state.DefaultDeviceStateService.INACTIVITY_TIMEOUT;
import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_ACTIVITY_TIME;
import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_CONNECT_TIME;
import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_DISCONNECT_TIME;
@ -508,42 +504,6 @@ public class DefaultDeviceStateServiceTest {
verify(service).fetchDeviceStateDataUsingSeparateRequests(deviceId);
}
@Test
public void givenPersistToTelemetryAndDefaultInactivityTimeoutFetched_whenTransformingToDeviceStateData_thenTryGetInactivityFromAttribute() {
var defaultInactivityTimeoutInSec = 60L;
var latest =
Map.of(
EntityKeyType.TIME_SERIES, Map.of(INACTIVITY_TIMEOUT, new TsValue(0, Long.toString(defaultInactivityTimeoutInSec * 1000))),
EntityKeyType.SERVER_ATTRIBUTE, Map.of(INACTIVITY_TIMEOUT, new TsValue(0, Long.toString(5000L)))
);
process(latest, defaultInactivityTimeoutInSec);
}
@Test
public void givenPersistToTelemetryAndNoInactivityTimeoutFetchedFromTimeSeries_whenTransformingToDeviceStateData_thenTryGetInactivityFromAttribute() {
var defaultInactivityTimeoutInSec = 60L;
var latest =
Map.of(
EntityKeyType.SERVER_ATTRIBUTE, Map.of(INACTIVITY_TIMEOUT, new TsValue(0, Long.toString(5000L)))
);
process(latest, defaultInactivityTimeoutInSec);
}
private void process(Map<EntityKeyType, Map<String, TsValue>> latest, long defaultInactivityTimeoutInSec) {
service.setDefaultInactivityTimeoutInSec(defaultInactivityTimeoutInSec);
service.setDefaultInactivityTimeoutMs(defaultInactivityTimeoutInSec * 1000);
service.setPersistToTelemetry(true);
var deviceUuid = UUID.randomUUID();
var deviceId = new DeviceId(deviceUuid);
DeviceStateData deviceStateData = service.toDeviceStateData(new EntityData(deviceId, latest, Map.of()), new DeviceIdInfo(TenantId.SYS_TENANT_ID.getId(), UUID.randomUUID(), deviceUuid));
assertThat(deviceStateData.getState().getInactivityTimeout()).isEqualTo(5000L);
}
private void initStateService(long timeout) throws InterruptedException {
service.stop();
reset(service, telemetrySubscriptionService);

View File

@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.ApiUsageStateValue;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.id.ApiUsageStateId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
@ -74,6 +75,7 @@ import java.util.stream.Stream;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
@ -168,6 +170,28 @@ class DefaultTelemetrySubscriptionServiceTest {
tsCallBackExecutor.shutdownNow();
}
/* --- Save time series API --- */
@Test
void shouldThrowErrorWhenTryingToSaveTimeseriesForApiUsageState() {
// GIVEN
var request = TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.customerId(customerId)
.entityId(new ApiUsageStateId(UUID.randomUUID()))
.entries(sampleTelemetry)
.strategy(TimeseriesSaveRequest.Strategy.PROCESS_ALL)
.build();
// WHEN
assertThatThrownBy(() -> telemetryService.saveTimeseries(request))
.isInstanceOf(RuntimeException.class)
.hasMessage("Can't update API Usage State!");
// THEN
then(tsService).shouldHaveNoInteractions();
}
@Test
void shouldReportStorageDataPointsApiUsageWhenTimeSeriesIsSaved() {
// GIVEN
@ -377,7 +401,7 @@ class DefaultTelemetrySubscriptionServiceTest {
);
}
// used to emulate versions returned by save latest API
// used to emulate sequence numbers returned by save latest API
private static List<Long> listOfNNumbers(int N) {
return LongStream.range(0, N).boxed().toList();
}

View File

@ -19,7 +19,6 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.AssetProfileId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;

View File

@ -375,8 +375,7 @@ public final class TbMsg implements Serializable {
protected TbMsgProcessingCtx ctx;
protected TbMsgCallback callback;
TbMsgBuilder() {
}
TbMsgBuilder() {}
TbMsgBuilder(TbMsg tbMsg) {
this.queueName = tbMsg.queueName;

View File

@ -809,6 +809,14 @@ message DeviceInactivityProto {
int64 lastInactivityTime = 5;
}
message DeviceInactivityTimeoutUpdateProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 deviceIdMSB = 3;
int64 deviceIdLSB = 4;
int64 inactivityTimeout = 5;
}
message CalculatedFieldTelemetryMsgProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
@ -1621,6 +1629,7 @@ message ToCoreMsg {
DeviceConnectProto deviceConnectMsg = 50;
DeviceDisconnectProto deviceDisconnectMsg = 51;
DeviceInactivityProto deviceInactivityMsg = 52;
DeviceInactivityTimeoutUpdateProto deviceInactivityTimeoutUpdateMsg = 53;
}
/* High priority messages with low latency are handled by ThingsBoard Core Service separately */

View File

@ -36,6 +36,9 @@ public class DonAsynchron {
FutureCallback<T> callback = new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
if (onSuccess == null) {
return;
}
try {
onSuccess.accept(result);
} catch (Throwable th) {
@ -45,6 +48,9 @@ public class DonAsynchron {
@Override
public void onFailure(Throwable t) {
if (onFailure == null) {
return;
}
onFailure.accept(t);
}
};

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.dao.sql.cf;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.jpa.repository.JpaRepository;

View File

@ -15,6 +15,4 @@
*/
package org.thingsboard.server.dao.sql.device;
public interface NativeAssetRepository extends NativeProfileEntityRepository {
}
public interface NativeAssetRepository extends NativeProfileEntityRepository {}

View File

@ -24,5 +24,4 @@ public interface NativeDeviceRepository extends NativeProfileEntityRepository {
PageData<DeviceIdInfo> findDeviceIdInfos(Pageable pageable);
}

View File

@ -14,22 +14,6 @@
-- limitations under the License.
--
--
-- ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
CREATE TABLE IF NOT EXISTS tb_schema_settings
(
schema_version bigint NOT NULL,

View File

@ -19,7 +19,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TbCallback;
public interface RuleEngineDeviceStateManager {
public interface DeviceStateManager {
void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long connectTime, TbCallback callback);
@ -29,4 +29,6 @@ public interface RuleEngineDeviceStateManager {
void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long inactivityTime, TbCallback callback);
void onDeviceInactivityTimeoutUpdate(TenantId tenantId, DeviceId deviceId, long inactivityTimeout, TbCallback callback);
}

View File

@ -280,7 +280,7 @@ public interface TbContext {
DeviceCredentialsService getDeviceCredentialsService();
RuleEngineDeviceStateManager getDeviceStateManager();
DeviceStateManager getDeviceStateManager();
String getDeviceStateNodeRateLimitConfig();

View File

@ -58,8 +58,7 @@ public class TimeseriesDeleteRequest implements CalculatedFieldSystemAwareReques
private TbMsgType tbMsgType;
private FutureCallback<List<String>> callback;
Builder() {
}
Builder() {}
public Builder tenantId(TenantId tenantId) {
this.tenantId = tenantId;

View File

@ -17,7 +17,7 @@ package org.thingsboard.rule.engine.action;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
@ -119,7 +119,7 @@ public class TbDeviceStateNode implements TbNode {
TenantId tenantId = ctx.getTenantId();
long eventTs = msg.getMetaDataTs();
RuleEngineDeviceStateManager deviceStateManager = ctx.getDeviceStateManager();
DeviceStateManager deviceStateManager = ctx.getDeviceStateManager();
TbCallback callback = getMsgEnqueuedCallback(ctx, msg);
switch (event) {

View File

@ -29,7 +29,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
@ -66,7 +66,7 @@ public class TbDeviceStateNodeTest {
@Mock
private TbContext ctxMock;
@Mock
private RuleEngineDeviceStateManager deviceStateManagerMock;
private DeviceStateManager deviceStateManagerMock;
@Captor
private ArgumentCaptor<TbCallback> callbackCaptor;
private TbDeviceStateNode node;
@ -263,7 +263,7 @@ public class TbDeviceStateNodeTest {
@ParameterizedTest
@MethodSource
public void givenSupportedEventAndDeviceOriginator_whenOnMsg_thenCorrectEventIsSentWithCorrectCallback(TbMsgType supportedEventType, BiConsumer<RuleEngineDeviceStateManager, ArgumentCaptor<TbCallback>> actionVerification) {
public void givenSupportedEventAndDeviceOriginator_whenOnMsg_thenCorrectEventIsSentWithCorrectCallback(TbMsgType supportedEventType, BiConsumer<DeviceStateManager, ArgumentCaptor<TbCallback>> actionVerification) {
// GIVEN
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
given(ctxMock.getDeviceStateNodeRateLimitConfig()).willReturn("1:1");
@ -297,10 +297,10 @@ public class TbDeviceStateNodeTest {
private static Stream<Arguments> givenSupportedEventAndDeviceOriginator_whenOnMsg_thenCorrectEventIsSentWithCorrectCallback() {
return Stream.of(
Arguments.of(TbMsgType.CONNECT_EVENT, (BiConsumer<RuleEngineDeviceStateManager, ArgumentCaptor<TbCallback>>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceConnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.ACTIVITY_EVENT, (BiConsumer<RuleEngineDeviceStateManager, ArgumentCaptor<TbCallback>>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceActivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.DISCONNECT_EVENT, (BiConsumer<RuleEngineDeviceStateManager, ArgumentCaptor<TbCallback>>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceDisconnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.INACTIVITY_EVENT, (BiConsumer<RuleEngineDeviceStateManager, ArgumentCaptor<TbCallback>>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceInactivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture()))
Arguments.of(TbMsgType.CONNECT_EVENT, (BiConsumer<DeviceStateManager, ArgumentCaptor<TbCallback>>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceConnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.ACTIVITY_EVENT, (BiConsumer<DeviceStateManager, ArgumentCaptor<TbCallback>>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceActivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.DISCONNECT_EVENT, (BiConsumer<DeviceStateManager, ArgumentCaptor<TbCallback>>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceDisconnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.INACTIVITY_EVENT, (BiConsumer<DeviceStateManager, ArgumentCaptor<TbCallback>>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceInactivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture()))
);
}