Merge pull request #9030 from dskarzh/feature/device-state-node

Device state rule node; device state service improvements
This commit is contained in:
Andrew Shvayka 2024-02-16 12:18:39 +02:00 committed by GitHub
commit d5da538e09
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 2282 additions and 100 deletions

View File

@ -31,6 +31,7 @@ import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.NotificationCenter;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.SmsService;
import org.thingsboard.rule.engine.api.slack.SlackService;
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
@ -203,6 +204,10 @@ public class ActorSystemContext {
@Getter
private DeviceCredentialsService deviceCredentialsService;
@Autowired(required = false)
@Getter
private RuleEngineDeviceStateManager deviceStateManager;
@Autowired
@Getter
private TbTenantProfileCache tenantProfileCache;
@ -556,6 +561,10 @@ public class ActorSystemContext {
@Getter
private boolean externalNodeForceAck;
@Value("${state.rule.node.deviceState.rateLimit:1:1,30:60,60:3600}")
@Getter
private String deviceStateNodeRateLimitConfig;
@Getter
@Setter
private TbActorSystem actorSystem;

View File

@ -28,6 +28,7 @@ import org.thingsboard.rule.engine.api.RuleEngineAlarmService;
import org.thingsboard.rule.engine.api.RuleEngineApiUsageStateService;
import org.thingsboard.rule.engine.api.RuleEngineAssetProfileCache;
import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.RuleEngineRpcService;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.ScriptEngine;
@ -108,6 +109,7 @@ import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider;
import org.thingsboard.server.queue.common.SimpleTbQueueCallback;
import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
import org.thingsboard.server.service.script.RuleNodeTbelScriptEngine;
@ -213,7 +215,19 @@ class DefaultTbContext implements TbContext {
if (nodeCtx.getSelf().isDebugMode()) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain");
}
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(onSuccess, onFailure));
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(
metadata -> {
if (onSuccess != null) {
onSuccess.run();
}
},
t -> {
if (onFailure != null) {
onFailure.accept(t);
} else {
log.debug("[{}] Failed to put item into queue!", nodeCtx.getTenantId().getId(), t);
}
}));
}
@Override
@ -299,7 +313,19 @@ class DefaultTbContext implements TbContext {
relationTypes.forEach(relationType ->
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, relationType, null, failureMessage));
}
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg.build(), new SimpleTbQueueCallback(onSuccess, onFailure));
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg.build(), new SimpleTbQueueCallback(
metadata -> {
if (onSuccess != null) {
onSuccess.run();
}
},
t -> {
if (onFailure != null) {
onFailure.accept(t);
} else {
log.debug("[{}] Failed to put item into queue!", nodeCtx.getTenantId().getId(), t);
}
}));
}
@Override
@ -658,6 +684,16 @@ class DefaultTbContext implements TbContext {
return mainCtx.getDeviceCredentialsService();
}
@Override
public RuleEngineDeviceStateManager getDeviceStateManager() {
return mainCtx.getDeviceStateManager();
}
@Override
public String getDeviceStateNodeRateLimitConfig() {
return mainCtx.getDeviceStateNodeRateLimitConfig();
}
@Override
public TbClusterService getClusterService() {
return mainCtx.getClusterService();
@ -952,29 +988,4 @@ class DefaultTbContext implements TbContext {
return failureMessage;
}
private class SimpleTbQueueCallback implements TbQueueCallback {
private final Runnable onSuccess;
private final Consumer<Throwable> onFailure;
public SimpleTbQueueCallback(Runnable onSuccess, Consumer<Throwable> onFailure) {
this.onSuccess = onSuccess;
this.onFailure = onFailure;
}
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
if (onSuccess != null) {
onSuccess.run();
}
}
@Override
public void onFailure(Throwable t) {
if (onFailure != null) {
onFailure.accept(t);
} else {
log.debug("[{}] Failed to put item into queue", nodeCtx.getTenantId(), t);
}
}
}
}

View File

@ -15,6 +15,9 @@
*/
package org.thingsboard.server.service.queue;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@ -148,6 +151,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
protected volatile ExecutorService consumersExecutor;
protected volatile ExecutorService usageStatsExecutor;
private volatile ExecutorService firmwareStatesExecutor;
private volatile ListeningExecutorService deviceActivityEventsExecutor;
public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory,
ActorSystemContext actorContext,
@ -195,6 +199,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
this.consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-core-consumer"));
this.usageStatsExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-usage-stats-consumer"));
this.firmwareStatesExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-firmware-notifications-consumer"));
this.deviceActivityEventsExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-device-activity-events-executor")));
}
@PreDestroy
@ -209,6 +214,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
if (firmwareStatesExecutor != null) {
firmwareStatesExecutor.shutdownNow();
}
if (deviceActivityEventsExecutor != null) {
deviceActivityEventsExecutor.shutdownNow();
}
}
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
@ -265,14 +273,23 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
log.trace("[{}] Forwarding message to device actor {}", id, toCoreMsg.getToDeviceActorMsg());
forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
} else if (toCoreMsg.hasDeviceStateServiceMsg()) {
log.trace("[{}] Forwarding message to state service {}", id, toCoreMsg.getDeviceStateServiceMsg());
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceStateServiceMsg());
forwardToStateService(toCoreMsg.getDeviceStateServiceMsg(), callback);
} else if (toCoreMsg.hasEdgeNotificationMsg()) {
log.trace("[{}] Forwarding message to edge service {}", id, toCoreMsg.getEdgeNotificationMsg());
forwardToEdgeNotificationService(toCoreMsg.getEdgeNotificationMsg(), callback);
} else if (toCoreMsg.hasDeviceConnectMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceConnectMsg());
forwardToStateService(toCoreMsg.getDeviceConnectMsg(), callback);
} else if (toCoreMsg.hasDeviceActivityMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceActivityMsg());
forwardToStateService(toCoreMsg.getDeviceActivityMsg(), callback);
} else if (toCoreMsg.hasDeviceDisconnectMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceDisconnectMsg());
forwardToStateService(toCoreMsg.getDeviceDisconnectMsg(), callback);
} else if (toCoreMsg.hasDeviceInactivityMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceInactivityMsg());
forwardToStateService(toCoreMsg.getDeviceInactivityMsg(), callback);
} else if (toCoreMsg.hasToDeviceActorNotification()) {
TbActorMsg actorMsg = ProtoUtils.fromProto(toCoreMsg.getToDeviceActorNotification());
if (actorMsg != null) {
@ -641,25 +658,71 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
}
}
private void forwardToStateService(DeviceStateServiceMsgProto deviceStateServiceMsg, TbCallback callback) {
void forwardToStateService(DeviceStateServiceMsgProto deviceStateServiceMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceStateServiceMsg);
}
stateService.onQueueMsg(deviceStateServiceMsg, callback);
}
private void forwardToStateService(TransportProtos.DeviceActivityProto deviceActivityMsg, TbCallback callback) {
void forwardToStateService(TransportProtos.DeviceConnectProto deviceConnectMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceConnectMsg);
}
var tenantId = toTenantId(deviceConnectMsg.getTenantIdMSB(), deviceConnectMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceConnectMsg.getDeviceIdMSB(), deviceConnectMsg.getDeviceIdLSB()));
ListenableFuture<?> future = deviceActivityEventsExecutor.submit(() -> stateService.onDeviceConnect(tenantId, deviceId, deviceConnectMsg.getLastConnectTime()));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process device connect message for device [{}]", tenantId.getId(), deviceId.getId(), t);
callback.onFailure(t);
});
}
void forwardToStateService(TransportProtos.DeviceActivityProto deviceActivityMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceActivityMsg);
}
TenantId tenantId = toTenantId(deviceActivityMsg.getTenantIdMSB(), deviceActivityMsg.getTenantIdLSB());
DeviceId deviceId = new DeviceId(new UUID(deviceActivityMsg.getDeviceIdMSB(), deviceActivityMsg.getDeviceIdLSB()));
try {
stateService.onDeviceActivity(tenantId, deviceId, deviceActivityMsg.getLastActivityTime());
callback.onSuccess();
} catch (Exception e) {
callback.onFailure(new RuntimeException("Failed update device activity for device [" + deviceId.getId() + "]!", e));
var tenantId = toTenantId(deviceActivityMsg.getTenantIdMSB(), deviceActivityMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceActivityMsg.getDeviceIdMSB(), deviceActivityMsg.getDeviceIdLSB()));
ListenableFuture<?> future = deviceActivityEventsExecutor.submit(() -> stateService.onDeviceActivity(tenantId, deviceId, deviceActivityMsg.getLastActivityTime()));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process device activity message for device [{}]", tenantId.getId(), deviceId.getId(), t);
callback.onFailure(new RuntimeException("Failed to update device activity for device [" + deviceId.getId() + "]!", t));
});
}
void forwardToStateService(TransportProtos.DeviceDisconnectProto deviceDisconnectMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceDisconnectMsg);
}
var tenantId = toTenantId(deviceDisconnectMsg.getTenantIdMSB(), deviceDisconnectMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceDisconnectMsg.getDeviceIdMSB(), deviceDisconnectMsg.getDeviceIdLSB()));
ListenableFuture<?> future = deviceActivityEventsExecutor.submit(() -> stateService.onDeviceDisconnect(tenantId, deviceId, deviceDisconnectMsg.getLastDisconnectTime()));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process device disconnect message for device [{}]", tenantId.getId(), deviceId.getId(), t);
callback.onFailure(t);
});
}
void forwardToStateService(TransportProtos.DeviceInactivityProto deviceInactivityMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceInactivityMsg);
}
var tenantId = toTenantId(deviceInactivityMsg.getTenantIdMSB(), deviceInactivityMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceInactivityMsg.getDeviceIdMSB(), deviceInactivityMsg.getDeviceIdLSB()));
ListenableFuture<?> future = deviceActivityEventsExecutor.submit(() -> stateService.onDeviceInactivity(tenantId, deviceId, deviceInactivityMsg.getLastInactivityTime()));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process device inactivity message for device [{}]", tenantId.getId(), deviceId.getId(), t);
callback.onFailure(t);
});
}
private void forwardToNotificationSchedulerService(TransportProtos.NotificationSchedulerServiceMsg msg, TbCallback callback) {

View File

@ -37,7 +37,10 @@ public class TbCoreConsumerStats {
public static final String DEVICE_STATES = "deviceState";
public static final String SUBSCRIPTION_MSGS = "subMsgs";
public static final String EDGE_NOTIFICATIONS = "edgeNfs";
public static final String DEVICE_CONNECTS = "deviceConnect";
public static final String DEVICE_ACTIVITIES = "deviceActivity";
public static final String DEVICE_DISCONNECTS = "deviceDisconnect";
public static final String DEVICE_INACTIVITIES = "deviceInactivity";
public static final String TO_CORE_NF_OTHER = "coreNfOther"; // normally, there is no messages when codebase is fine
public static final String TO_CORE_NF_COMPONENT_LIFECYCLE = "coreNfCompLfcl";
@ -63,7 +66,10 @@ public class TbCoreConsumerStats {
private final StatsCounter deviceStateCounter;
private final StatsCounter subscriptionMsgCounter;
private final StatsCounter edgeNotificationsCounter;
private final StatsCounter deviceConnectsCounter;
private final StatsCounter deviceActivitiesCounter;
private final StatsCounter deviceDisconnectsCounter;
private final StatsCounter deviceInactivitiesCounter;
private final StatsCounter toCoreNfOtherCounter;
private final StatsCounter toCoreNfComponentLifecycleCounter;
@ -94,7 +100,10 @@ public class TbCoreConsumerStats {
this.deviceStateCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_STATES));
this.subscriptionMsgCounter = register(statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS));
this.edgeNotificationsCounter = register(statsFactory.createStatsCounter(statsKey, EDGE_NOTIFICATIONS));
this.deviceConnectsCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_CONNECTS));
this.deviceActivitiesCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_ACTIVITIES));
this.deviceDisconnectsCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_DISCONNECTS));
this.deviceInactivitiesCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_INACTIVITIES));
// Core notification counters
this.toCoreNfOtherCounter = register(statsFactory.createStatsCounter(statsKey, TO_CORE_NF_OTHER));
@ -152,11 +161,26 @@ public class TbCoreConsumerStats {
edgeNotificationsCounter.increment();
}
public void log(TransportProtos.DeviceConnectProto msg) {
totalCounter.increment();
deviceConnectsCounter.increment();
}
public void log(TransportProtos.DeviceActivityProto msg) {
totalCounter.increment();
deviceActivitiesCounter.increment();
}
public void log(TransportProtos.DeviceDisconnectProto msg) {
totalCounter.increment();
deviceDisconnectsCounter.increment();
}
public void log(TransportProtos.DeviceInactivityProto msg) {
totalCounter.increment();
deviceInactivitiesCounter.increment();
}
public void log(TransportProtos.SubscriptionMgrMsgProto msg) {
totalCounter.increment();
subscriptionMsgCounter.increment();

View File

@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.exception.TenantNotFoundException;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
@ -221,25 +222,35 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
@Override
public void onDeviceConnect(TenantId tenantId, DeviceId deviceId) {
if (cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId)) {
public void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long lastConnectTime) {
if (cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId)) {
return;
}
if (lastConnectTime < 0) {
log.trace("[{}][{}] On device connect: received negative last connect ts [{}]. Skipping this event.",
tenantId.getId(), deviceId.getId(), lastConnectTime);
return;
}
log.trace("on Device Connect [{}]", deviceId.getId());
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
long ts = getCurrentTimeMillis();
stateData.getState().setLastConnectTime(ts);
save(deviceId, LAST_CONNECT_TIME, ts);
long currentLastConnectTime = stateData.getState().getLastConnectTime();
if (lastConnectTime <= currentLastConnectTime) {
log.trace("[{}][{}] On device connect: received outdated last connect ts [{}]. Skipping this event. Current last connect ts [{}].",
tenantId.getId(), deviceId.getId(), lastConnectTime, currentLastConnectTime);
return;
}
log.trace("[{}][{}] On device connect: processing connect event with ts [{}].", tenantId.getId(), deviceId.getId(), lastConnectTime);
stateData.getState().setLastConnectTime(lastConnectTime);
save(deviceId, LAST_CONNECT_TIME, lastConnectTime);
pushRuleEngineMessage(stateData, TbMsgType.CONNECT_EVENT);
checkAndUpdateState(deviceId, stateData);
}
@Override
public void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long lastReportedActivity) {
if (cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId)) {
if (cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId)) {
return;
}
log.trace("on Device Activity [{}], lastReportedActivity [{}]", deviceId.getId(), lastReportedActivity);
log.trace("[{}] on Device Activity [{}], lastReportedActivity [{}]", tenantId.getId(), deviceId.getId(), lastReportedActivity);
final DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
if (lastReportedActivity > 0 && lastReportedActivity > stateData.getState().getLastActivityTime()) {
updateActivityState(deviceId, stateData, lastReportedActivity);
@ -261,37 +272,75 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
onDeviceActivityStatusChange(deviceId, true, stateData);
}
} else {
log.debug("updateActivityState - fetched state IN NULL for device {}, lastReportedActivity {}", deviceId, lastReportedActivity);
log.debug("updateActivityState - fetched state IS NULL for device {}, lastReportedActivity {}", deviceId, lastReportedActivity);
cleanupEntity(deviceId);
}
}
@Override
public void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId) {
if (cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId)) {
public void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId, long lastDisconnectTime) {
if (cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId)) {
return;
}
if (lastDisconnectTime < 0) {
log.trace("[{}][{}] On device disconnect: received negative last disconnect ts [{}]. Skipping this event.",
tenantId.getId(), deviceId.getId(), lastDisconnectTime);
return;
}
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
long ts = getCurrentTimeMillis();
stateData.getState().setLastDisconnectTime(ts);
save(deviceId, LAST_DISCONNECT_TIME, ts);
long currentLastDisconnectTime = stateData.getState().getLastDisconnectTime();
if (lastDisconnectTime <= currentLastDisconnectTime) {
log.trace("[{}][{}] On device disconnect: received outdated last disconnect ts [{}]. Skipping this event. Current last disconnect ts [{}].",
tenantId.getId(), deviceId.getId(), lastDisconnectTime, currentLastDisconnectTime);
return;
}
log.trace("[{}][{}] On device disconnect: processing disconnect event with ts [{}].", tenantId.getId(), deviceId.getId(), lastDisconnectTime);
stateData.getState().setLastDisconnectTime(lastDisconnectTime);
save(deviceId, LAST_DISCONNECT_TIME, lastDisconnectTime);
pushRuleEngineMessage(stateData, TbMsgType.DISCONNECT_EVENT);
}
@Override
public void onDeviceInactivityTimeoutUpdate(TenantId tenantId, DeviceId deviceId, long inactivityTimeout) {
if (cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId)) {
if (cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId)) {
return;
}
if (inactivityTimeout <= 0L) {
inactivityTimeout = defaultInactivityTimeoutMs;
}
log.trace("on Device Activity Timeout Update device id {} inactivityTimeout {}", deviceId, inactivityTimeout);
log.trace("[{}] on Device Activity Timeout Update device id {} inactivityTimeout {}", tenantId.getId(), deviceId.getId(), inactivityTimeout);
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
stateData.getState().setInactivityTimeout(inactivityTimeout);
checkAndUpdateState(deviceId, stateData);
}
@Override
public void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long lastInactivityTime) {
if (cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId)) {
return;
}
if (lastInactivityTime < 0) {
log.trace("[{}][{}] On device inactivity: received negative last inactivity ts [{}]. Skipping this event.",
tenantId.getId(), deviceId.getId(), lastInactivityTime);
return;
}
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
long currentLastInactivityAlarmTime = stateData.getState().getLastInactivityAlarmTime();
if (lastInactivityTime <= currentLastInactivityAlarmTime) {
log.trace("[{}][{}] On device inactivity: received last inactivity ts [{}] is less than current last inactivity ts [{}]. Skipping this event.",
tenantId.getId(), deviceId.getId(), lastInactivityTime, currentLastInactivityAlarmTime);
return;
}
long currentLastActivityTime = stateData.getState().getLastActivityTime();
if (lastInactivityTime <= currentLastActivityTime) {
log.trace("[{}][{}] On device inactivity: received last inactivity ts [{}] is less or equal to current last activity ts [{}]. Skipping this event.",
tenantId.getId(), deviceId.getId(), lastInactivityTime, currentLastActivityTime);
return;
}
log.trace("[{}][{}] On device inactivity: processing inactivity event with ts [{}].", tenantId.getId(), deviceId.getId(), lastInactivityTime);
reportInactivity(lastInactivityTime, deviceId, stateData);
}
@Override
public void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbCallback callback) {
try {
@ -497,10 +546,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
&& (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() <= state.getLastActivityTime())
&& stateData.getDeviceCreationTime() + state.getInactivityTimeout() <= ts) {
if (partitionService.resolve(ServiceType.TB_CORE, stateData.getTenantId(), deviceId).isMyPartition()) {
state.setActive(false);
state.setLastInactivityAlarmTime(ts);
onDeviceActivityStatusChange(deviceId, false, stateData);
save(deviceId, INACTIVITY_ALARM_TIME, ts);
reportInactivity(ts, deviceId, stateData);
} else {
cleanupEntity(deviceId);
}
@ -511,32 +557,34 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
}
private void reportInactivity(long ts, DeviceId deviceId, DeviceStateData stateData) {
DeviceState state = stateData.getState();
state.setActive(false);
state.setLastInactivityAlarmTime(ts);
save(deviceId, INACTIVITY_ALARM_TIME, ts);
onDeviceActivityStatusChange(deviceId, false, stateData);
}
boolean isActive(long ts, DeviceState state) {
return ts < state.getLastActivityTime() + state.getInactivityTimeout();
}
@Nonnull
DeviceStateData getOrFetchDeviceStateData(DeviceId deviceId) {
DeviceStateData deviceStateData = deviceStates.get(deviceId);
if (deviceStateData != null) {
return deviceStateData;
}
return fetchDeviceStateDataUsingEntityDataQuery(deviceId);
return deviceStates.computeIfAbsent(deviceId, this::fetchDeviceStateDataUsingSeparateRequests);
}
DeviceStateData fetchDeviceStateDataUsingEntityDataQuery(final DeviceId deviceId) {
DeviceStateData fetchDeviceStateDataUsingSeparateRequests(final DeviceId deviceId) {
final Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId);
if (device == null) {
log.warn("[{}] Failed to fetch device by Id!", deviceId);
throw new RuntimeException("Failed to fetch device by Id " + deviceId);
throw new RuntimeException("Failed to fetch device by id [" + deviceId + "]!");
}
try {
DeviceStateData deviceStateData = fetchDeviceState(device).get();
deviceStates.putIfAbsent(deviceId, deviceStateData);
return deviceStateData;
return fetchDeviceState(device).get();
} catch (InterruptedException | ExecutionException e) {
log.warn("[{}] Failed to fetch device state!", deviceId, e);
throw new RuntimeException(e);
throw new RuntimeException("Failed to fetch device state for device [" + deviceId + "]");
}
}
@ -553,7 +601,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
.build());
}
private boolean cleanDeviceStateIfBelongsExternalPartition(TenantId tenantId, final DeviceId deviceId) {
boolean cleanDeviceStateIfBelongsToExternalPartition(TenantId tenantId, final DeviceId deviceId) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
boolean cleanup = !partitionedEntities.containsKey(tpi);
if (cleanup) {
@ -621,7 +669,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
long lastActivityTime = getEntryValue(data, LAST_ACTIVITY_TIME, 0L);
long inactivityAlarmTime = getEntryValue(data, INACTIVITY_ALARM_TIME, 0L);
long inactivityTimeout = getEntryValue(data, INACTIVITY_TIMEOUT, defaultInactivityTimeoutMs);
//Actual active state by wall-clock will updated outside this method. This method is only for fetch persistent state
// Actual active state by wall-clock will be updated outside this method. This method is only for fetching persistent state
final boolean active = getEntryValue(data, ACTIVITY_STATE, false);
DeviceState deviceState = DeviceState.builder()
.active(active)
@ -646,7 +694,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
return deviceStateData;
} catch (Exception e) {
log.warn("[{}] Failed to fetch device state data", device.getId(), e);
throw new RuntimeException(e);
throw new RuntimeException("Failed to fetch device state data for device [" + device.getId() + "]", e);
}
}
};
@ -670,8 +718,13 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
return success ? result : result.stream().filter(Objects::nonNull).collect(Collectors.toList());
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn("Failed to initialized device state futures for ids: {} due to:", deviceIds, e);
throw new RuntimeException(e);
String deviceIdsStr = deviceIds.stream()
.map(DeviceIdInfo::getDeviceId)
.map(UUIDBased::getId)
.map(UUID::toString)
.collect(Collectors.joining(", "));
log.warn("Failed to initialized device state futures for ids [{}] due to:", deviceIdsStr, e);
throw new RuntimeException("Failed to initialized device state futures for ids [" + deviceIdsStr + "]!", e);
}
}
@ -701,7 +754,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
deviceIdInfo.getDeviceId(), inactivityTimeout);
inactivityTimeout = getEntryValue(ed, EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT, defaultInactivityTimeoutMs);
}
//Actual active state by wall-clock will updated outside this method. This method is only for fetch persistent state
// Actual active state by wall-clock will be updated outside this method. This method is only for fetching persistent state
final boolean active = getEntryValue(ed, getKeyType(), ACTIVITY_STATE, false);
DeviceState deviceState = DeviceState.builder()
.active(active)
@ -757,7 +810,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
return defaultValue;
}
private long getEntryValue(List<? extends KvEntry> kvEntries, String attributeName, long defaultValue) {
if (kvEntries != null) {
for (KvEntry entry : kvEntries) {

View File

@ -0,0 +1,196 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.state;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.common.SimpleTbQueueCallback;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import java.util.Optional;
import java.util.UUID;
@Slf4j
@Service
@TbRuleEngineComponent
public class DefaultRuleEngineDeviceStateManager implements RuleEngineDeviceStateManager {
private final TbServiceInfoProvider serviceInfoProvider;
private final PartitionService partitionService;
private final Optional<DeviceStateService> deviceStateService;
private final TbClusterService clusterService;
public DefaultRuleEngineDeviceStateManager(
TbServiceInfoProvider serviceInfoProvider, PartitionService partitionService,
Optional<DeviceStateService> deviceStateServiceOptional, TbClusterService clusterService
) {
this.serviceInfoProvider = serviceInfoProvider;
this.partitionService = partitionService;
this.deviceStateService = deviceStateServiceOptional;
this.clusterService = clusterService;
}
@Getter
private abstract static class ConnectivityEventInfo {
private final TenantId tenantId;
private final DeviceId deviceId;
private final long eventTime;
private ConnectivityEventInfo(TenantId tenantId, DeviceId deviceId, long eventTime) {
this.tenantId = tenantId;
this.deviceId = deviceId;
this.eventTime = eventTime;
}
abstract void forwardToLocalService();
abstract TransportProtos.ToCoreMsg toQueueMsg();
}
@Override
public void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long connectTime, TbCallback callback) {
routeEvent(new ConnectivityEventInfo(tenantId, deviceId, connectTime) {
@Override
void forwardToLocalService() {
deviceStateService.ifPresent(service -> service.onDeviceConnect(tenantId, deviceId, connectTime));
}
@Override
TransportProtos.ToCoreMsg toQueueMsg() {
var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastConnectTime(connectTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceConnectMsg(deviceConnectMsg)
.build();
}
}, callback);
}
@Override
public void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long activityTime, TbCallback callback) {
routeEvent(new ConnectivityEventInfo(tenantId, deviceId, activityTime) {
@Override
void forwardToLocalService() {
deviceStateService.ifPresent(service -> service.onDeviceActivity(tenantId, deviceId, activityTime));
}
@Override
TransportProtos.ToCoreMsg toQueueMsg() {
var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastActivityTime(activityTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceActivityMsg(deviceActivityMsg)
.build();
}
}, callback);
}
@Override
public void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId, long disconnectTime, TbCallback callback) {
routeEvent(new ConnectivityEventInfo(tenantId, deviceId, disconnectTime) {
@Override
void forwardToLocalService() {
deviceStateService.ifPresent(service -> service.onDeviceDisconnect(tenantId, deviceId, disconnectTime));
}
@Override
TransportProtos.ToCoreMsg toQueueMsg() {
var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastDisconnectTime(disconnectTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceDisconnectMsg(deviceDisconnectMsg)
.build();
}
}, callback);
}
@Override
public void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long inactivityTime, TbCallback callback) {
routeEvent(new ConnectivityEventInfo(tenantId, deviceId, inactivityTime) {
@Override
void forwardToLocalService() {
deviceStateService.ifPresent(service -> service.onDeviceInactivity(tenantId, deviceId, inactivityTime));
}
@Override
TransportProtos.ToCoreMsg toQueueMsg() {
var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastInactivityTime(inactivityTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceInactivityMsg(deviceInactivityMsg)
.build();
}
}, callback);
}
private void routeEvent(ConnectivityEventInfo eventInfo, TbCallback callback) {
var tenantId = eventInfo.getTenantId();
var deviceId = eventInfo.getDeviceId();
long eventTime = eventInfo.getEventTime();
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
if (serviceInfoProvider.isService(ServiceType.TB_CORE) && tpi.isMyPartition() && deviceStateService.isPresent()) {
log.debug("[{}][{}] Forwarding device connectivity event to local service. Event time: [{}].", tenantId.getId(), deviceId.getId(), eventTime);
try {
eventInfo.forwardToLocalService();
} catch (Exception e) {
log.error("[{}][{}] Failed to process device connectivity event. Event time: [{}].", tenantId.getId(), deviceId.getId(), eventTime, e);
callback.onFailure(e);
return;
}
callback.onSuccess();
} else {
TransportProtos.ToCoreMsg msg = eventInfo.toQueueMsg();
log.debug("[{}][{}] Sending device connectivity message to core. Event time: [{}].", tenantId.getId(), deviceId.getId(), eventTime);
clusterService.pushMsgToCore(tpi, UUID.randomUUID(), msg, new SimpleTbQueueCallback(__ -> callback.onSuccess(), callback::onFailure));
}
}
}

View File

@ -27,11 +27,21 @@ import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
*/
public interface DeviceStateService extends ApplicationListener<PartitionChangeEvent> {
void onDeviceConnect(TenantId tenantId, DeviceId deviceId);
void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long lastConnectTime);
default void onDeviceConnect(TenantId tenantId, DeviceId deviceId) {
onDeviceConnect(tenantId, deviceId, System.currentTimeMillis());
}
void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long lastReportedActivityTime);
void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId);
void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId, long lastDisconnectTime);
default void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId) {
onDeviceDisconnect(tenantId, deviceId, System.currentTimeMillis());
}
void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long lastInactivityTime);
void onDeviceInactivityTimeoutUpdate(TenantId tenantId, DeviceId deviceId, long inactivityTimeout);

View File

@ -786,7 +786,7 @@ state:
defaultInactivityTimeoutInSec: "${DEFAULT_INACTIVITY_TIMEOUT:600}"
defaultStateCheckIntervalInSec: "${DEFAULT_STATE_CHECK_INTERVAL:60}" # Interval for checking the device state after a specified period. Time in seconds
# Controls whether we store the device 'active' flag in attributes (default) or telemetry.
# If you device to change this parameter, you should re-create the device info view as one of the following:
# If you decide to change this parameter, you should re-create the device info view as one of the following:
# If 'persistToTelemetry' is changed from 'false' to 'true': 'CREATE OR REPLACE VIEW device_info_view AS SELECT * FROM device_info_active_ts_view;'
# If 'persistToTelemetry' is changed from 'true' to 'false': 'CREATE OR REPLACE VIEW device_info_view AS SELECT * FROM device_info_active_attribute_view;'
persistToTelemetry: "${PERSIST_STATE_TO_TELEMETRY:false}"
@ -794,6 +794,15 @@ state:
# Used only when state.persistToTelemetry is set to 'true' and Cassandra is used for timeseries data.
# 0 means time-to-live mechanism is disabled.
telemetryTtl: "${STATE_TELEMETRY_TTL:0}"
# Configuration properties for rule nodes related to device activity state
rule:
node:
# Device state rule node
deviceState:
# Defines the rate at which device connectivity events can be triggered.
# Comma-separated list of capacity:duration pairs that define bandwidth capacity and refill duration for token bucket rate limit algorithm.
# Refill is set to be greedy. Please refer to Bucket4j library documentation for more details.
rateLimit: "${DEVICE_STATE_NODE_RATE_LIMIT_CONFIGURATION:1:1,30:60,60:3600}"
# Tbel parameters
tbel:

View File

@ -0,0 +1,532 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.queue;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.service.state.DeviceStateService;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
@ExtendWith(MockitoExtension.class)
public class DefaultTbCoreConsumerServiceTest {
@Mock
private DeviceStateService stateServiceMock;
@Mock
private TbCoreConsumerStats statsMock;
@Mock
private TbCallback tbCallbackMock;
private final TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
private final DeviceId deviceId = new DeviceId(UUID.randomUUID());
private final long time = System.currentTimeMillis();
private ListeningExecutorService executor;
@Mock
private DefaultTbCoreConsumerService defaultTbCoreConsumerServiceMock;
@BeforeEach
public void setup() {
executor = MoreExecutors.newDirectExecutorService();
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stateService", stateServiceMock);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "deviceActivityEventsExecutor", executor);
}
@AfterEach
public void cleanup() {
if (executor != null) {
executor.shutdown();
try {
if (!executor.awaitTermination(10L, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
@Test
public void givenProcessingSuccess_whenForwardingDeviceStateMsgToStateService_thenOnSuccessCallbackIsCalled() {
// GIVEN
var stateMsg = TransportProtos.DeviceStateServiceMsgProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setAdded(true)
.setUpdated(false)
.setDeleted(false)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(stateMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(stateMsg, tbCallbackMock);
// THEN
then(stateServiceMock).should().onQueueMsg(stateMsg, tbCallbackMock);
}
@Test
public void givenStatsEnabled_whenForwardingDeviceStateMsgToStateService_thenStatsAreRecorded() {
// GIVEN
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", true);
var stateMsg = TransportProtos.DeviceStateServiceMsgProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setAdded(true)
.setUpdated(false)
.setDeleted(false)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(stateMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(stateMsg, tbCallbackMock);
// THEN
then(statsMock).should().log(stateMsg);
}
@Test
public void givenStatsDisabled_whenForwardingDeviceStateMsgToStateService_thenStatsAreNotRecorded() {
// GIVEN
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", false);
var stateMsg = TransportProtos.DeviceStateServiceMsgProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setAdded(true)
.setUpdated(false)
.setDeleted(false)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(stateMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(stateMsg, tbCallbackMock);
// THEN
then(statsMock).should(never()).log(stateMsg);
}
@Test
public void givenProcessingSuccess_whenForwardingConnectMsgToStateService_thenOnSuccessCallbackIsCalled() {
// GIVEN
var connectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastConnectTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(connectMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(connectMsg, tbCallbackMock);
// THEN
then(stateServiceMock).should().onDeviceConnect(tenantId, deviceId, time);
then(tbCallbackMock).should().onSuccess();
then(tbCallbackMock).should(never()).onFailure(any());
}
@Test
public void givenProcessingFailure_whenForwardingConnectMsgToStateService_thenOnFailureCallbackIsCalled() {
// GIVEN
var connectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastConnectTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(connectMsg, tbCallbackMock);
var runtimeException = new RuntimeException("Something bad happened!");
doThrow(runtimeException).when(stateServiceMock).onDeviceConnect(tenantId, deviceId, time);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(connectMsg, tbCallbackMock);
// THEN
then(tbCallbackMock).should(never()).onSuccess();
then(tbCallbackMock).should().onFailure(runtimeException);
}
@Test
public void givenStatsEnabled_whenForwardingConnectMsgToStateService_thenStatsAreRecorded() {
// GIVEN
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", true);
var connectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastConnectTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(connectMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(connectMsg, tbCallbackMock);
// THEN
then(statsMock).should().log(connectMsg);
}
@Test
public void givenStatsDisabled_whenForwardingConnectMsgToStateService_thenStatsAreNotRecorded() {
// GIVEN
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", false);
var connectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastConnectTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(connectMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(connectMsg, tbCallbackMock);
// THEN
then(statsMock).should(never()).log(connectMsg);
}
@Test
public void givenProcessingSuccess_whenForwardingActivityMsgToStateService_thenOnSuccessCallbackIsCalled() {
// GIVEN
var activityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastActivityTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(activityMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(activityMsg, tbCallbackMock);
// THEN
then(stateServiceMock).should().onDeviceActivity(tenantId, deviceId, time);
then(tbCallbackMock).should().onSuccess();
then(tbCallbackMock).should(never()).onFailure(any());
}
@Test
public void givenProcessingFailure_whenForwardingActivityMsgToStateService_thenOnFailureCallbackIsCalled() {
// GIVEN
var activityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastActivityTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(activityMsg, tbCallbackMock);
var runtimeException = new RuntimeException("Something bad happened!");
doThrow(runtimeException).when(stateServiceMock).onDeviceActivity(tenantId, deviceId, time);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(activityMsg, tbCallbackMock);
// THEN
then(tbCallbackMock).should(never()).onSuccess();
var exceptionCaptor = ArgumentCaptor.forClass(Throwable.class);
then(tbCallbackMock).should().onFailure(exceptionCaptor.capture());
assertThat(exceptionCaptor.getValue())
.isInstanceOf(RuntimeException.class)
.hasMessage("Failed to update device activity for device [" + deviceId.getId() + "]!")
.hasCause(runtimeException);
}
@Test
public void givenStatsEnabled_whenForwardingActivityMsgToStateService_thenStatsAreRecorded() {
// GIVEN
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", true);
var activityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastActivityTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(activityMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(activityMsg, tbCallbackMock);
// THEN
then(statsMock).should().log(activityMsg);
}
@Test
public void givenStatsDisabled_whenForwardingActivityMsgToStateService_thenStatsAreNotRecorded() {
// GIVEN
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", false);
var activityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastActivityTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(activityMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(activityMsg, tbCallbackMock);
// THEN
then(statsMock).should(never()).log(activityMsg);
}
@Test
public void givenProcessingSuccess_whenForwardingDisconnectMsgToStateService_thenOnSuccessCallbackIsCalled() {
// GIVEN
var disconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastDisconnectTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(disconnectMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(disconnectMsg, tbCallbackMock);
// THEN
then(stateServiceMock).should().onDeviceDisconnect(tenantId, deviceId, time);
then(tbCallbackMock).should().onSuccess();
then(tbCallbackMock).should(never()).onFailure(any());
}
@Test
public void givenProcessingFailure_whenForwardingDisconnectMsgToStateService_thenOnFailureCallbackIsCalled() {
// GIVEN
var disconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastDisconnectTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(disconnectMsg, tbCallbackMock);
var runtimeException = new RuntimeException("Something bad happened!");
doThrow(runtimeException).when(stateServiceMock).onDeviceDisconnect(tenantId, deviceId, time);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(disconnectMsg, tbCallbackMock);
// THEN
then(tbCallbackMock).should(never()).onSuccess();
then(tbCallbackMock).should().onFailure(runtimeException);
}
@Test
public void givenStatsEnabled_whenForwardingDisconnectMsgToStateService_thenStatsAreRecorded() {
// GIVEN
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", true);
var disconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastDisconnectTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(disconnectMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(disconnectMsg, tbCallbackMock);
// THEN
then(statsMock).should().log(disconnectMsg);
}
@Test
public void givenStatsDisabled_whenForwardingDisconnectMsgToStateService_thenStatsAreNotRecorded() {
// GIVEN
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", false);
var disconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastDisconnectTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(disconnectMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(disconnectMsg, tbCallbackMock);
// THEN
then(statsMock).should(never()).log(disconnectMsg);
}
@Test
public void givenProcessingSuccess_whenForwardingInactivityMsgToStateService_thenOnSuccessCallbackIsCalled() {
// GIVEN
var inactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastInactivityTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityMsg, tbCallbackMock);
// THEN
then(stateServiceMock).should().onDeviceInactivity(tenantId, deviceId, time);
then(tbCallbackMock).should().onSuccess();
then(tbCallbackMock).should(never()).onFailure(any());
}
@Test
public void givenProcessingFailure_whenForwardingInactivityMsgToStateService_thenOnFailureCallbackIsCalled() {
// GIVEN
var inactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastInactivityTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityMsg, tbCallbackMock);
var runtimeException = new RuntimeException("Something bad happened!");
doThrow(runtimeException).when(stateServiceMock).onDeviceInactivity(tenantId, deviceId, time);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityMsg, tbCallbackMock);
// THEN
then(tbCallbackMock).should(never()).onSuccess();
then(tbCallbackMock).should().onFailure(runtimeException);
}
@Test
public void givenStatsEnabled_whenForwardingInactivityMsgToStateService_thenStatsAreRecorded() {
// GIVEN
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", true);
var inactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastInactivityTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityMsg, tbCallbackMock);
// THEN
then(statsMock).should().log(inactivityMsg);
}
@Test
public void givenStatsDisabled_whenForwardingInactivityMsgToStateService_thenStatsAreNotRecorded() {
// GIVEN
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", false);
var inactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastInactivityTime(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityMsg, tbCallbackMock);
// THEN
then(statsMock).should(never()).log(inactivityMsg);
}
}

View File

@ -21,11 +21,13 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.DeviceIdInfo;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
@ -54,19 +56,26 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
@ -74,6 +83,8 @@ import static org.thingsboard.server.service.state.DefaultDeviceStateService.ACT
import static org.thingsboard.server.service.state.DefaultDeviceStateService.INACTIVITY_ALARM_TIME;
import static org.thingsboard.server.service.state.DefaultDeviceStateService.INACTIVITY_TIMEOUT;
import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_ACTIVITY_TIME;
import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_CONNECT_TIME;
import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_DISCONNECT_TIME;
@ExtendWith(MockitoExtension.class)
public class DefaultDeviceStateServiceTest {
@ -116,21 +127,366 @@ public class DefaultDeviceStateServiceTest {
tpi = TopicPartitionInfo.builder().myPartition(true).build();
}
@Test
public void givenDeviceBelongsToExternalPartition_whenOnDeviceConnect_thenCleansStateAndDoesNotReportConnect() {
// GIVEN
doReturn(true).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
// WHEN
service.onDeviceConnect(tenantId, deviceId, System.currentTimeMillis());
// THEN
then(service).should().cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
then(service).should(never()).getOrFetchDeviceStateData(deviceId);
then(service).should(never()).checkAndUpdateState(eq(deviceId), any());
then(clusterService).shouldHaveNoInteractions();
then(notificationRuleProcessor).shouldHaveNoInteractions();
then(telemetrySubscriptionService).shouldHaveNoInteractions();
}
@ParameterizedTest
@ValueSource(longs = {Long.MIN_VALUE, -100, -1})
public void givenNegativeLastConnectTime_whenOnDeviceConnect_thenSkipsThisEvent(long negativeLastConnectTime) {
// GIVEN
doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
// WHEN
service.onDeviceConnect(tenantId, deviceId, negativeLastConnectTime);
// THEN
then(service).should(never()).getOrFetchDeviceStateData(deviceId);
then(service).should(never()).checkAndUpdateState(eq(deviceId), any());
then(clusterService).shouldHaveNoInteractions();
then(notificationRuleProcessor).shouldHaveNoInteractions();
then(telemetrySubscriptionService).shouldHaveNoInteractions();
}
@ParameterizedTest
@MethodSource("provideOutdatedTimestamps")
public void givenOutdatedLastConnectTime_whenOnDeviceDisconnect_thenSkipsThisEvent(long outdatedLastConnectTime, long currentLastConnectTime) {
// GIVEN
doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(DeviceState.builder().lastConnectTime(currentLastConnectTime).build())
.build();
service.deviceStates.put(deviceId, deviceStateData);
// WHEN
service.onDeviceConnect(tenantId, deviceId, outdatedLastConnectTime);
// THEN
then(service).should(never()).checkAndUpdateState(eq(deviceId), any());
then(clusterService).shouldHaveNoInteractions();
then(notificationRuleProcessor).shouldHaveNoInteractions();
then(telemetrySubscriptionService).shouldHaveNoInteractions();
}
@Test
public void givenDeviceBelongsToMyPartition_whenOnDeviceConnect_thenReportsConnect() {
// GIVEN
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(DeviceState.builder().build())
.metaData(new TbMsgMetaData())
.build();
doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
service.deviceStates.put(deviceId, deviceStateData);
long lastConnectTime = System.currentTimeMillis();
// WHEN
service.onDeviceConnect(tenantId, deviceId, lastConnectTime);
// THEN
then(telemetrySubscriptionService).should().saveAttrAndNotify(
eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(DataConstants.SERVER_SCOPE), eq(LAST_CONNECT_TIME), eq(lastConnectTime), any()
);
var msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
then(clusterService).should().pushMsgToRuleEngine(eq(tenantId), eq(deviceId), msgCaptor.capture(), any());
var actualMsg = msgCaptor.getValue();
assertThat(actualMsg.getType()).isEqualTo(TbMsgType.CONNECT_EVENT.name());
assertThat(actualMsg.getOriginator()).isEqualTo(deviceId);
}
@Test
public void givenDeviceBelongsToExternalPartition_whenOnDeviceDisconnect_thenCleansStateAndDoesNotReportDisconnect() {
// GIVEN
doReturn(true).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
// WHEN
service.onDeviceDisconnect(tenantId, deviceId, System.currentTimeMillis());
// THEN
then(service).should().cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
then(service).should(never()).getOrFetchDeviceStateData(deviceId);
then(clusterService).shouldHaveNoInteractions();
then(notificationRuleProcessor).shouldHaveNoInteractions();
then(telemetrySubscriptionService).shouldHaveNoInteractions();
}
@ParameterizedTest
@ValueSource(longs = {Long.MIN_VALUE, -100, -1})
public void givenNegativeLastDisconnectTime_whenOnDeviceDisconnect_thenSkipsThisEvent(long negativeLastDisconnectTime) {
// GIVEN
doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
// WHEN
service.onDeviceDisconnect(tenantId, deviceId, negativeLastDisconnectTime);
// THEN
then(service).should(never()).getOrFetchDeviceStateData(deviceId);
then(clusterService).shouldHaveNoInteractions();
then(notificationRuleProcessor).shouldHaveNoInteractions();
then(telemetrySubscriptionService).shouldHaveNoInteractions();
}
@ParameterizedTest
@MethodSource("provideOutdatedTimestamps")
public void givenOutdatedLastDisconnectTime_whenOnDeviceDisconnect_thenSkipsThisEvent(long outdatedLastDisconnectTime, long currentLastDisconnectTime) {
// GIVEN
doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(DeviceState.builder().lastDisconnectTime(currentLastDisconnectTime).build())
.build();
service.deviceStates.put(deviceId, deviceStateData);
// WHEN
service.onDeviceDisconnect(tenantId, deviceId, outdatedLastDisconnectTime);
// THEN
then(clusterService).shouldHaveNoInteractions();
then(notificationRuleProcessor).shouldHaveNoInteractions();
then(telemetrySubscriptionService).shouldHaveNoInteractions();
}
@Test
public void givenDeviceBelongsToMyPartition_whenOnDeviceDisconnect_thenReportsDisconnect() {
// GIVEN
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(DeviceState.builder().build())
.metaData(new TbMsgMetaData())
.build();
doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
service.deviceStates.put(deviceId, deviceStateData);
long lastDisconnectTime = System.currentTimeMillis();
// WHEN
service.onDeviceDisconnect(tenantId, deviceId, lastDisconnectTime);
// THEN
then(telemetrySubscriptionService).should().saveAttrAndNotify(
eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(DataConstants.SERVER_SCOPE),
eq(LAST_DISCONNECT_TIME), eq(lastDisconnectTime), any()
);
var msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
then(clusterService).should().pushMsgToRuleEngine(eq(tenantId), eq(deviceId), msgCaptor.capture(), any());
var actualMsg = msgCaptor.getValue();
assertThat(actualMsg.getType()).isEqualTo(TbMsgType.DISCONNECT_EVENT.name());
assertThat(actualMsg.getOriginator()).isEqualTo(deviceId);
}
@Test
public void givenDeviceBelongsToExternalPartition_whenOnDeviceInactivity_thenCleansStateAndDoesNotReportInactivity() {
// GIVEN
doReturn(true).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
// WHEN
service.onDeviceInactivity(tenantId, deviceId, System.currentTimeMillis());
// THEN
then(service).should().cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
then(service).should(never()).fetchDeviceStateDataUsingSeparateRequests(deviceId);
then(clusterService).shouldHaveNoInteractions();
then(notificationRuleProcessor).shouldHaveNoInteractions();
then(telemetrySubscriptionService).shouldHaveNoInteractions();
}
@ParameterizedTest
@ValueSource(longs = {Long.MIN_VALUE, -100, -1})
public void givenNegativeLastInactivityTime_whenOnDeviceInactivity_thenSkipsThisEvent(long negativeLastInactivityTime) {
// GIVEN
doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
// WHEN
service.onDeviceInactivity(tenantId, deviceId, negativeLastInactivityTime);
// THEN
then(service).should(never()).getOrFetchDeviceStateData(deviceId);
then(clusterService).shouldHaveNoInteractions();
then(notificationRuleProcessor).shouldHaveNoInteractions();
then(telemetrySubscriptionService).shouldHaveNoInteractions();
}
@ParameterizedTest
@MethodSource("provideOutdatedTimestamps")
public void givenReceivedInactivityTimeIsLessThanOrEqualToCurrentInactivityTime_whenOnDeviceInactivity_thenSkipsThisEvent(
long outdatedLastInactivityTime, long currentLastInactivityTime
) {
// GIVEN
doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(DeviceState.builder().lastInactivityAlarmTime(currentLastInactivityTime).build())
.build();
service.deviceStates.put(deviceId, deviceStateData);
// WHEN
service.onDeviceInactivity(tenantId, deviceId, outdatedLastInactivityTime);
// THEN
then(clusterService).shouldHaveNoInteractions();
then(notificationRuleProcessor).shouldHaveNoInteractions();
then(telemetrySubscriptionService).shouldHaveNoInteractions();
}
@ParameterizedTest
@MethodSource("provideOutdatedTimestamps")
public void givenReceivedInactivityTimeIsLessThanOrEqualToCurrentActivityTime_whenOnDeviceInactivity_thenSkipsThisEvent(
long outdatedLastInactivityTime, long currentLastActivityTime
) {
// GIVEN
doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(DeviceState.builder().lastActivityTime(currentLastActivityTime).build())
.build();
service.deviceStates.put(deviceId, deviceStateData);
// WHEN
service.onDeviceInactivity(tenantId, deviceId, outdatedLastInactivityTime);
// THEN
then(clusterService).shouldHaveNoInteractions();
then(notificationRuleProcessor).shouldHaveNoInteractions();
then(telemetrySubscriptionService).shouldHaveNoInteractions();
}
private static Stream<Arguments> provideOutdatedTimestamps() {
return Stream.of(
Arguments.of(0, 0),
Arguments.of(0, 100),
Arguments.of(50, 100),
Arguments.of(99, 100),
Arguments.of(100, 100)
);
}
@Test
public void givenDeviceBelongsToMyPartition_whenOnDeviceInactivity_thenReportsInactivity() {
// GIVEN
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(DeviceState.builder().build())
.metaData(new TbMsgMetaData())
.build();
doReturn(false).when(service).cleanDeviceStateIfBelongsToExternalPartition(tenantId, deviceId);
service.deviceStates.put(deviceId, deviceStateData);
long lastInactivityTime = System.currentTimeMillis();
// WHEN
service.onDeviceInactivity(tenantId, deviceId, lastInactivityTime);
// THEN
then(telemetrySubscriptionService).should().saveAttrAndNotify(
eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(DataConstants.SERVER_SCOPE),
eq(INACTIVITY_ALARM_TIME), eq(lastInactivityTime), any()
);
then(telemetrySubscriptionService).should().saveAttrAndNotify(
eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(DataConstants.SERVER_SCOPE),
eq(ACTIVITY_STATE), eq(false), any()
);
var msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
then(clusterService).should()
.pushMsgToRuleEngine(eq(tenantId), eq(deviceId), msgCaptor.capture(), any());
var actualMsg = msgCaptor.getValue();
assertThat(actualMsg.getType()).isEqualTo(TbMsgType.INACTIVITY_EVENT.name());
assertThat(actualMsg.getOriginator()).isEqualTo(deviceId);
var notificationCaptor = ArgumentCaptor.forClass(DeviceActivityTrigger.class);
then(notificationRuleProcessor).should().process(notificationCaptor.capture());
var actualNotification = notificationCaptor.getValue();
assertThat(actualNotification.getTenantId()).isEqualTo(tenantId);
assertThat(actualNotification.getDeviceId()).isEqualTo(deviceId);
assertThat(actualNotification.isActive()).isFalse();
}
@Test
public void givenInactivityTimeoutReached_whenUpdateInactivityStateIfExpired_thenReportsInactivity() {
// GIVEN
var deviceStateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.state(DeviceState.builder().build())
.metaData(new TbMsgMetaData())
.build();
given(partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId)).willReturn(tpi);
// WHEN
service.updateInactivityStateIfExpired(System.currentTimeMillis(), deviceId, deviceStateData);
// THEN
then(telemetrySubscriptionService).should().saveAttrAndNotify(
eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(DataConstants.SERVER_SCOPE),
eq(INACTIVITY_ALARM_TIME), anyLong(), any()
);
then(telemetrySubscriptionService).should().saveAttrAndNotify(
eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(DataConstants.SERVER_SCOPE),
eq(ACTIVITY_STATE), eq(false), any()
);
var msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
then(clusterService).should()
.pushMsgToRuleEngine(eq(tenantId), eq(deviceId), msgCaptor.capture(), any());
var actualMsg = msgCaptor.getValue();
assertThat(actualMsg.getType()).isEqualTo(TbMsgType.INACTIVITY_EVENT.name());
assertThat(actualMsg.getOriginator()).isEqualTo(deviceId);
var notificationCaptor = ArgumentCaptor.forClass(DeviceActivityTrigger.class);
then(notificationRuleProcessor).should().process(notificationCaptor.capture());
var actualNotification = notificationCaptor.getValue();
assertThat(actualNotification.getTenantId()).isEqualTo(tenantId);
assertThat(actualNotification.getDeviceId()).isEqualTo(deviceId);
assertThat(actualNotification.isActive()).isFalse();
}
@Test
public void givenDeviceIdFromDeviceStatesMap_whenGetOrFetchDeviceStateData_thenNoStackOverflow() {
service.deviceStates.put(deviceId, deviceStateDataMock);
DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId);
assertThat(deviceStateData).isEqualTo(deviceStateDataMock);
verify(service, never()).fetchDeviceStateDataUsingEntityDataQuery(deviceId);
verify(service, never()).fetchDeviceStateDataUsingSeparateRequests(deviceId);
}
@Test
public void givenDeviceIdWithoutDeviceStateInMap_whenGetOrFetchDeviceStateData_thenFetchDeviceStateData() {
service.deviceStates.clear();
willReturn(deviceStateDataMock).given(service).fetchDeviceStateDataUsingEntityDataQuery(deviceId);
willReturn(deviceStateDataMock).given(service).fetchDeviceStateDataUsingSeparateRequests(deviceId);
DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId);
assertThat(deviceStateData).isEqualTo(deviceStateDataMock);
verify(service, times(1)).fetchDeviceStateDataUsingEntityDataQuery(deviceId);
verify(service).fetchDeviceStateDataUsingSeparateRequests(deviceId);
}
@Test
@ -356,7 +712,7 @@ public class DefaultDeviceStateServiceTest {
}
private void activityVerify(boolean isActive) {
verify(telemetrySubscriptionService, times(1)).saveAttrAndNotify(any(), eq(deviceId), any(), eq(ACTIVITY_STATE), eq(isActive), any());
verify(telemetrySubscriptionService).saveAttrAndNotify(any(), eq(deviceId), any(), eq(ACTIVITY_STATE), eq(isActive), any());
}
@Test
@ -435,28 +791,28 @@ public class DefaultDeviceStateServiceTest {
private static Stream<Arguments> provideParametersForUpdateActivityState() {
return Stream.of(
Arguments.of(true, 100, 120, 80, 80, false, false),
Arguments.of(true, 100, 120, 80, 80, false, false),
Arguments.of(true, 100, 120, 100, 100, false, false),
Arguments.of(true, 100, 120, 100, 100, false, false),
Arguments.of(false, 100, 120, 110, 110, false, true),
Arguments.of(true, 100, 100, 80, 80, false, false),
Arguments.of(true, 100, 100, 80, 80, false, false),
Arguments.of(true, 100, 100, 100, 100, false, false),
Arguments.of(true, 100, 100, 100, 100, false, false),
Arguments.of(false, 100, 100, 110, 0, true, true),
Arguments.of(false, 100, 100, 110, 0, true, true),
Arguments.of(false, 100, 110, 110, 0, true, true),
Arguments.of(false, 100, 110, 110, 0, true, true),
Arguments.of(false, 100, 110, 120, 0, true, true),
Arguments.of(false, 100, 110, 120, 0, true, true),
Arguments.of(true, 0, 0, 0, 0, false, false),
Arguments.of(true, 0, 0, 0, 0, false, false),
Arguments.of(false, 0, 0, 0, 0, true, true)
Arguments.of(false, 0, 0, 0, 0, true, true)
);
}
@ -679,4 +1035,40 @@ public class DefaultDeviceStateServiceTest {
);
}
@Test
public void givenConcurrentAccess_whenGetOrFetchDeviceStateData_thenFetchDeviceStateDataInvokedOnce() {
doAnswer(invocation -> {
Thread.sleep(100);
return deviceStateDataMock;
}).when(service).fetchDeviceStateDataUsingSeparateRequests(deviceId);
int numberOfThreads = 10;
var allThreadsReadyLatch = new CountDownLatch(numberOfThreads);
ExecutorService executor = null;
try {
executor = Executors.newFixedThreadPool(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
executor.submit(() -> {
allThreadsReadyLatch.countDown();
try {
allThreadsReadyLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
service.getOrFetchDeviceStateData(deviceId);
});
}
executor.shutdown();
await().atMost(10, TimeUnit.SECONDS).until(executor::isTerminated);
} finally {
if (executor != null) {
executor.shutdownNow();
}
}
then(service).should().fetchDeviceStateDataUsingSeparateRequests(deviceId);
}
}

View File

@ -0,0 +1,266 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.state;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Stream;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
@ExtendWith(MockitoExtension.class)
public class DefaultRuleEngineDeviceStateManagerTest {
@Mock
private static DeviceStateService deviceStateServiceMock;
@Mock
private static TbCallback tbCallbackMock;
@Mock
private static TbClusterService clusterServiceMock;
@Mock
private static TbQueueMsgMetadata metadataMock;
@Mock
private TbServiceInfoProvider serviceInfoProviderMock;
@Mock
private PartitionService partitionServiceMock;
@Captor
private static ArgumentCaptor<TbQueueCallback> queueCallbackCaptor;
private static DefaultRuleEngineDeviceStateManager deviceStateManager;
private static final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("57ab2e6c-bc4c-11ee-a506-0242ac120002"));
private static final DeviceId DEVICE_ID = DeviceId.fromString("74a9053e-bc4c-11ee-a506-0242ac120002");
private static final long EVENT_TS = System.currentTimeMillis();
private static final RuntimeException RUNTIME_EXCEPTION = new RuntimeException("Something bad happened!");
private static final TopicPartitionInfo MY_TPI = TopicPartitionInfo.builder().myPartition(true).build();
private static final TopicPartitionInfo EXTERNAL_TPI = TopicPartitionInfo.builder().myPartition(false).build();
@BeforeEach
public void setup() {
deviceStateManager = new DefaultRuleEngineDeviceStateManager(serviceInfoProviderMock, partitionServiceMock, Optional.of(deviceStateServiceMock), clusterServiceMock);
}
@ParameterizedTest
@DisplayName("Given event should be routed to local service and event processed has succeeded, " +
"when onDeviceX() is called, then should route event to local service and call onSuccess() callback.")
@MethodSource
public void givenRoutedToLocalAndProcessingSuccess_whenOnDeviceAction_thenShouldCallLocalServiceAndSuccessCallback(Runnable onDeviceAction, Runnable actionVerification) {
// GIVEN
given(serviceInfoProviderMock.isService(ServiceType.TB_CORE)).willReturn(true);
given(partitionServiceMock.resolve(ServiceType.TB_CORE, TENANT_ID, DEVICE_ID)).willReturn(MY_TPI);
onDeviceAction.run();
// THEN
actionVerification.run();
then(clusterServiceMock).shouldHaveNoInteractions();
then(tbCallbackMock).should().onSuccess();
then(tbCallbackMock).should(never()).onFailure(any());
}
private static Stream<Arguments> givenRoutedToLocalAndProcessingSuccess_whenOnDeviceAction_thenShouldCallLocalServiceAndSuccessCallback() {
return Stream.of(
Arguments.of(
(Runnable) () -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Runnable) () -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Runnable) () -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Runnable) () -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS)
)
);
}
@ParameterizedTest
@DisplayName("Given event should be routed to local service and event processed has failed, " +
"when onDeviceX() is called, then should route event to local service and call onFailure() callback.")
@MethodSource
public void givenRoutedToLocalAndProcessingFailure_whenOnDeviceAction_thenShouldCallLocalServiceAndFailureCallback(
Runnable exceptionThrowSetup, Runnable onDeviceAction, Runnable actionVerification
) {
// GIVEN
given(serviceInfoProviderMock.isService(ServiceType.TB_CORE)).willReturn(true);
given(partitionServiceMock.resolve(ServiceType.TB_CORE, TENANT_ID, DEVICE_ID)).willReturn(MY_TPI);
exceptionThrowSetup.run();
// WHEN
onDeviceAction.run();
// THEN
actionVerification.run();
then(clusterServiceMock).shouldHaveNoInteractions();
then(tbCallbackMock).should(never()).onSuccess();
then(tbCallbackMock).should().onFailure(RUNTIME_EXCEPTION);
}
private static Stream<Arguments> givenRoutedToLocalAndProcessingFailure_whenOnDeviceAction_thenShouldCallLocalServiceAndFailureCallback() {
return Stream.of(
Arguments.of(
(Runnable) () -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS),
(Runnable) () -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Runnable) () -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS),
(Runnable) () -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Runnable) () -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS),
(Runnable) () -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Runnable) () -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS),
(Runnable) () -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS)
)
);
}
@ParameterizedTest
@DisplayName("Given event should be routed to external service, " +
"when onDeviceX() is called, then should send correct queue message to external service with correct callback object.")
@MethodSource
public void givenRoutedToExternal_whenOnDeviceAction_thenShouldSendQueueMsgToExternalServiceWithCorrectCallback(Runnable onDeviceAction, Runnable actionVerification) {
// WHEN
ReflectionTestUtils.setField(deviceStateManager, "deviceStateService", Optional.empty());
given(serviceInfoProviderMock.isService(ServiceType.TB_CORE)).willReturn(false);
given(partitionServiceMock.resolve(ServiceType.TB_CORE, TENANT_ID, DEVICE_ID)).willReturn(EXTERNAL_TPI);
onDeviceAction.run();
// THEN
actionVerification.run();
TbQueueCallback callback = queueCallbackCaptor.getValue();
callback.onSuccess(metadataMock);
then(tbCallbackMock).should().onSuccess();
callback.onFailure(RUNTIME_EXCEPTION);
then(tbCallbackMock).should().onFailure(RUNTIME_EXCEPTION);
}
private static Stream<Arguments> givenRoutedToExternal_whenOnDeviceAction_thenShouldSendQueueMsgToExternalServiceWithCorrectCallback() {
return Stream.of(
Arguments.of(
(Runnable) () -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> {
var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits())
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits())
.setLastConnectTime(EVENT_TS)
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceConnectMsg(deviceConnectMsg)
.build();
then(clusterServiceMock).should().pushMsgToCore(eq(EXTERNAL_TPI), any(UUID.class), eq(toCoreMsg), queueCallbackCaptor.capture());
}
),
Arguments.of(
(Runnable) () -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> {
var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits())
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits())
.setLastActivityTime(EVENT_TS)
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceActivityMsg(deviceActivityMsg)
.build();
then(clusterServiceMock).should().pushMsgToCore(eq(EXTERNAL_TPI), any(UUID.class), eq(toCoreMsg), queueCallbackCaptor.capture());
}
),
Arguments.of(
(Runnable) () -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> {
var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits())
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits())
.setLastDisconnectTime(EVENT_TS)
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceDisconnectMsg(deviceDisconnectMsg)
.build();
then(clusterServiceMock).should().pushMsgToCore(eq(EXTERNAL_TPI), any(UUID.class), eq(toCoreMsg), queueCallbackCaptor.capture());
}
),
Arguments.of(
(Runnable) () -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> {
var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits())
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits())
.setLastInactivityTime(EVENT_TS)
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceInactivityMsg(deviceInactivityMsg)
.build();
then(clusterServiceMock).should().pushMsgToCore(eq(EXTERNAL_TPI), any(UUID.class), eq(toCoreMsg), queueCallbackCaptor.capture());
}
)
);
}
}

View File

@ -22,22 +22,22 @@ import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.thingsboard.server.common.data.msg.TbMsgType.ALARM;
import static org.thingsboard.server.common.data.msg.TbMsgType.ALARM_DELETE;
import static org.thingsboard.server.common.data.msg.TbMsgType.NA;
import static org.thingsboard.server.common.data.msg.TbMsgType.DEDUPLICATION_TIMEOUT_SELF_MSG;
import static org.thingsboard.server.common.data.msg.TbMsgType.DELAY_TIMEOUT_SELF_MSG;
import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_ASSIGNED_TO_EDGE;
import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_UNASSIGNED_FROM_EDGE;
import static org.thingsboard.server.common.data.msg.TbMsgType.MSG_COUNT_SELF_MSG;
import static org.thingsboard.server.common.data.msg.TbMsgType.PROVISION_FAILURE;
import static org.thingsboard.server.common.data.msg.TbMsgType.PROVISION_SUCCESS;
import static org.thingsboard.server.common.data.msg.TbMsgType.DEVICE_PROFILE_PERIODIC_SELF_MSG;
import static org.thingsboard.server.common.data.msg.TbMsgType.DEVICE_PROFILE_UPDATE_SELF_MSG;
import static org.thingsboard.server.common.data.msg.TbMsgType.DEVICE_UPDATE_SELF_MSG;
import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_ASSIGNED_TO_EDGE;
import static org.thingsboard.server.common.data.msg.TbMsgType.ENTITY_UNASSIGNED_FROM_EDGE;
import static org.thingsboard.server.common.data.msg.TbMsgType.GENERATOR_NODE_SELF_MSG;
import static org.thingsboard.server.common.data.msg.TbMsgType.MSG_COUNT_SELF_MSG;
import static org.thingsboard.server.common.data.msg.TbMsgType.NA;
import static org.thingsboard.server.common.data.msg.TbMsgType.PROVISION_FAILURE;
import static org.thingsboard.server.common.data.msg.TbMsgType.PROVISION_SUCCESS;
import static org.thingsboard.server.common.data.msg.TbMsgType.SEND_EMAIL;
class TbMsgTypeTest {
private static final List<TbMsgType> typesWithNullRuleNodeConnection = List.of(
ALARM,
ALARM_DELETE,

View File

@ -131,6 +131,11 @@ public final class TbMsg implements Serializable {
metaData.copy(), TbMsgDataType.JSON, data, null, null, null, TbMsgCallback.EMPTY);
}
public static TbMsg newMsg(TbMsgType type, EntityId originator, TbMsgMetaData metaData, String data, long ts) {
return new TbMsg(null, UUID.randomUUID(), ts, type, originator, null,
metaData.copy(), TbMsgDataType.JSON, data, null, null, null, TbMsgCallback.EMPTY);
}
// REALLY NEW MSG
/**
@ -335,7 +340,7 @@ public final class TbMsg implements Serializable {
this.originator = originator;
if (customerId == null || customerId.isNullUid()) {
if (originator != null && originator.getEntityType() == EntityType.CUSTOMER) {
this.customerId = (CustomerId) originator;
this.customerId = new CustomerId(originator.getId());
} else {
this.customerId = null;
}

View File

@ -551,6 +551,14 @@ message GetOtaPackageResponseMsg {
string fileName = 8;
}
message DeviceConnectProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 deviceIdMSB = 3;
int64 deviceIdLSB = 4;
int64 lastConnectTime = 5;
}
message DeviceActivityProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
@ -559,6 +567,22 @@ message DeviceActivityProto {
int64 lastActivityTime = 5;
}
message DeviceDisconnectProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 deviceIdMSB = 3;
int64 deviceIdLSB = 4;
int64 lastDisconnectTime = 5;
}
message DeviceInactivityProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 deviceIdMSB = 3;
int64 deviceIdLSB = 4;
int64 lastInactivityTime = 5;
}
//Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.
message SubscriptionInfoProto {
int64 lastActivityTime = 1;
@ -1271,6 +1295,9 @@ message ToCoreMsg {
LifecycleEventProto lifecycleEventMsg = 8;
ErrorEventProto errorEventMsg = 9;
ToDeviceActorNotificationMsgProto toDeviceActorNotification = 10;
DeviceConnectProto deviceConnectMsg = 50;
DeviceDisconnectProto deviceDisconnectMsg = 51;
DeviceInactivityProto deviceInactivityMsg = 52;
}
/* High priority messages with low latency are handled by ThingsBoard Core Service separately */

View File

@ -0,0 +1,47 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.common;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import java.util.function.Consumer;
public class SimpleTbQueueCallback implements TbQueueCallback {
private final Consumer<TbQueueMsgMetadata> onSuccess;
private final Consumer<Throwable> onFailure;
public SimpleTbQueueCallback(Consumer<TbQueueMsgMetadata> onSuccess, Consumer<Throwable> onFailure) {
this.onSuccess = onSuccess;
this.onFailure = onFailure;
}
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
if (onSuccess != null) {
onSuccess.accept(metadata);
}
}
@Override
public void onFailure(Throwable t) {
if (onFailure != null) {
onFailure.accept(t);
}
}
}

View File

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.api;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TbCallback;
public interface RuleEngineDeviceStateManager {
void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long connectTime, TbCallback callback);
void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long activityTime, TbCallback callback);
void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId, long disconnectTime, TbCallback callback);
void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long inactivityTime, TbCallback callback);
}

View File

@ -280,6 +280,10 @@ public interface TbContext {
DeviceCredentialsService getDeviceCredentialsService();
RuleEngineDeviceStateManager getDeviceStateManager();
String getDeviceStateNodeRateLimitConfig();
TbClusterService getClusterService();
DashboardService getDashboardService();

View File

@ -0,0 +1,173 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.action;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.tools.TbRateLimits;
import java.util.EnumSet;
import java.util.Set;
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "device state",
nodeDescription = "Triggers device connectivity events",
nodeDetails = "If incoming message originator is a device, registers configured event for that device in the Device State Service, which sends appropriate message to the Rule Engine." +
" If metadata <code>ts</code> property is present, it will be used as event timestamp. Otherwise, the message timestamp will be used." +
" If originator entity type is not <code>DEVICE</code> or unexpected error happened during processing, then incoming message is forwarded using <code>Failure</code> chain." +
" If rate of connectivity events for a given originator is too high, then incoming message is forwarded using <code>Rate limited</code> chain. " +
"<br>" +
"Supported device connectivity events are:" +
"<ul>" +
"<li>Connect event</li>" +
"<li>Disconnect event</li>" +
"<li>Activity event</li>" +
"<li>Inactivity event</li>" +
"</ul>" +
"This node is particularly useful when device isn't using transports to receive data, such as when fetching data from external API or computing new data within the rule chain.",
configClazz = TbDeviceStateNodeConfiguration.class,
relationTypes = {TbNodeConnectionType.SUCCESS, TbNodeConnectionType.FAILURE, "Rate limited"},
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeDeviceStateConfig"
)
public class TbDeviceStateNode implements TbNode {
private static final Set<TbMsgType> SUPPORTED_EVENTS = EnumSet.of(
TbMsgType.CONNECT_EVENT, TbMsgType.ACTIVITY_EVENT, TbMsgType.DISCONNECT_EVENT, TbMsgType.INACTIVITY_EVENT
);
private static final String DEFAULT_RATE_LIMIT_CONFIG = "1:1,30:60,60:3600";
private ConcurrentReferenceHashMap<DeviceId, TbRateLimits> rateLimits;
private String rateLimitConfig;
private TbMsgType event;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
TbMsgType event = TbNodeUtils.convert(configuration, TbDeviceStateNodeConfiguration.class).getEvent();
if (event == null) {
throw new TbNodeException("Event cannot be null!", true);
}
if (!SUPPORTED_EVENTS.contains(event)) {
throw new TbNodeException("Unsupported event: " + event, true);
}
this.event = event;
rateLimits = new ConcurrentReferenceHashMap<>();
String deviceStateNodeRateLimitConfig = ctx.getDeviceStateNodeRateLimitConfig();
try {
rateLimitConfig = new TbRateLimits(deviceStateNodeRateLimitConfig).getConfiguration();
} catch (Exception e) {
log.error("[{}][{}] Invalid rate limit configuration provided: [{}]. Will use default value [{}].",
ctx.getTenantId().getId(), ctx.getSelfId().getId(), deviceStateNodeRateLimitConfig, DEFAULT_RATE_LIMIT_CONFIG, e);
rateLimitConfig = DEFAULT_RATE_LIMIT_CONFIG;
}
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
EntityType originatorEntityType = msg.getOriginator().getEntityType();
if (!EntityType.DEVICE.equals(originatorEntityType)) {
ctx.tellFailure(msg, new IllegalArgumentException(
"Unsupported originator entity type: [" + originatorEntityType + "]. Only DEVICE entity type is supported."
));
return;
}
DeviceId originator = new DeviceId(msg.getOriginator().getId());
rateLimits.compute(originator, (__, rateLimit) -> {
if (rateLimit == null) {
rateLimit = new TbRateLimits(rateLimitConfig);
}
boolean isNotRateLimited = rateLimit.tryConsume();
if (isNotRateLimited) {
sendEventAndTell(ctx, originator, msg);
} else {
ctx.tellNext(msg, "Rate limited");
}
return rateLimit;
});
}
private void sendEventAndTell(TbContext ctx, DeviceId originator, TbMsg msg) {
TenantId tenantId = ctx.getTenantId();
long eventTs = msg.getMetaDataTs();
RuleEngineDeviceStateManager deviceStateManager = ctx.getDeviceStateManager();
TbCallback callback = getMsgEnqueuedCallback(ctx, msg);
switch (event) {
case CONNECT_EVENT:
deviceStateManager.onDeviceConnect(tenantId, originator, eventTs, callback);
break;
case ACTIVITY_EVENT:
deviceStateManager.onDeviceActivity(tenantId, originator, eventTs, callback);
break;
case DISCONNECT_EVENT:
deviceStateManager.onDeviceDisconnect(tenantId, originator, eventTs, callback);
break;
case INACTIVITY_EVENT:
deviceStateManager.onDeviceInactivity(tenantId, originator, eventTs, callback);
break;
default:
ctx.tellFailure(msg, new IllegalStateException("Configured event [" + event + "] is not supported!"));
}
}
private TbCallback getMsgEnqueuedCallback(TbContext ctx, TbMsg msg) {
return new TbCallback() {
@Override
public void onSuccess() {
ctx.tellSuccess(msg);
}
@Override
public void onFailure(Throwable t) {
ctx.tellFailure(msg, t);
}
};
}
@Override
public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
rateLimits.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey()));
}
@Override
public void destroy() {
if (rateLimits != null) {
rateLimits.clear();
rateLimits = null;
}
rateLimitConfig = null;
event = null;
}
}

View File

@ -0,0 +1,34 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.action;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
import org.thingsboard.server.common.data.msg.TbMsgType;
@Data
public class TbDeviceStateNodeConfiguration implements NodeConfiguration<TbDeviceStateNodeConfiguration> {
private TbMsgType event;
@Override
public TbDeviceStateNodeConfiguration defaultConfiguration() {
var config = new TbDeviceStateNodeConfiguration();
config.setEvent(TbMsgType.ACTIVITY_EVENT);
return config;
}
}

View File

@ -0,0 +1,296 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.action;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.tools.TbRateLimits;
import java.util.UUID;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ExtendWith(MockitoExtension.class)
public class TbDeviceStateNodeTest {
@Mock
private TbContext ctxMock;
@Mock
private static RuleEngineDeviceStateManager deviceStateManagerMock;
@Captor
private static ArgumentCaptor<TbCallback> callbackCaptor;
private TbDeviceStateNode node;
private TbDeviceStateNodeConfiguration config;
private static final TenantId TENANT_ID = TenantId.fromUUID(UUID.randomUUID());
private static final DeviceId DEVICE_ID = new DeviceId(UUID.randomUUID());
private static final long METADATA_TS = 123L;
private TbMsg msg;
@BeforeEach
public void setup() {
var metaData = new TbMsgMetaData();
metaData.putValue("deviceName", "My humidity sensor");
metaData.putValue("deviceType", "Humidity sensor");
metaData.putValue("ts", String.valueOf(METADATA_TS));
var data = JacksonUtil.newObjectNode();
data.put("humidity", 58.3);
msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, JacksonUtil.toString(data));
}
@BeforeEach
public void setUp() {
node = new TbDeviceStateNode();
config = new TbDeviceStateNodeConfiguration().defaultConfiguration();
}
@Test
public void givenDefaultConfiguration_whenInvoked_thenCorrectValuesAreSet() {
assertThat(config.getEvent()).isEqualTo(TbMsgType.ACTIVITY_EVENT);
}
@Test
public void givenNullEventInConfig_whenInit_thenThrowsUnrecoverableTbNodeException() {
// GIVEN-WHEN-THEN
assertThatThrownBy(() -> initNode(null))
.isInstanceOf(TbNodeException.class)
.hasMessage("Event cannot be null!")
.matches(e -> ((TbNodeException) e).isUnrecoverable());
}
@Test
public void givenInvalidRateLimitConfig_whenInit_thenUsesDefaultConfig() {
// GIVEN
given(ctxMock.getDeviceStateNodeRateLimitConfig()).willReturn("invalid rate limit config");
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
given(ctxMock.getSelfId()).willReturn(new RuleNodeId(UUID.randomUUID()));
// WHEN
try {
initNode(TbMsgType.ACTIVITY_EVENT);
} catch (Exception e) {
fail("Node failed to initialize!", e);
}
// THEN
String actualRateLimitConfig = (String) ReflectionTestUtils.getField(node, "rateLimitConfig");
assertThat(actualRateLimitConfig).isEqualTo("1:1,30:60,60:3600");
}
@Test
public void givenMsgArrivedTooFast_whenOnMsg_thenRateLimitsThisMsg() {
// GIVEN
ConcurrentReferenceHashMap<DeviceId, TbRateLimits> rateLimits = new ConcurrentReferenceHashMap<>();
ReflectionTestUtils.setField(node, "rateLimits", rateLimits);
var rateLimitMock = mock(TbRateLimits.class);
rateLimits.put(DEVICE_ID, rateLimitMock);
given(rateLimitMock.tryConsume()).willReturn(false);
// WHEN
node.onMsg(ctxMock, msg);
// THEN
then(ctxMock).should().tellNext(msg, "Rate limited");
then(ctxMock).should(never()).tellSuccess(any());
then(ctxMock).should(never()).tellFailure(any(), any());
then(ctxMock).shouldHaveNoMoreInteractions();
then(deviceStateManagerMock).shouldHaveNoInteractions();
}
@Test
public void givenHasNonLocalDevices_whenOnPartitionChange_thenRemovesEntriesForNonLocalDevices() {
// GIVEN
ConcurrentReferenceHashMap<DeviceId, TbRateLimits> rateLimits = new ConcurrentReferenceHashMap<>();
ReflectionTestUtils.setField(node, "rateLimits", rateLimits);
rateLimits.put(DEVICE_ID, new TbRateLimits("1:1"));
given(ctxMock.isLocalEntity(eq(DEVICE_ID))).willReturn(true);
DeviceId nonLocalDeviceId1 = new DeviceId(UUID.randomUUID());
rateLimits.put(nonLocalDeviceId1, new TbRateLimits("2:2"));
given(ctxMock.isLocalEntity(eq(nonLocalDeviceId1))).willReturn(false);
DeviceId nonLocalDeviceId2 = new DeviceId(UUID.randomUUID());
rateLimits.put(nonLocalDeviceId2, new TbRateLimits("3:3"));
given(ctxMock.isLocalEntity(eq(nonLocalDeviceId2))).willReturn(false);
// WHEN
node.onPartitionChangeMsg(ctxMock, new PartitionChangeMsg(ServiceType.TB_RULE_ENGINE));
// THEN
assertThat(rateLimits)
.containsKey(DEVICE_ID)
.doesNotContainKey(nonLocalDeviceId1)
.doesNotContainKey(nonLocalDeviceId2)
.size().isOne();
}
@ParameterizedTest
@EnumSource(
value = TbMsgType.class,
names = {"CONNECT_EVENT", "ACTIVITY_EVENT", "DISCONNECT_EVENT", "INACTIVITY_EVENT"},
mode = EnumSource.Mode.EXCLUDE
)
public void givenUnsupportedEventInConfig_whenInit_thenThrowsUnrecoverableTbNodeException(TbMsgType unsupportedEvent) {
// GIVEN-WHEN-THEN
assertThatThrownBy(() -> initNode(unsupportedEvent))
.isInstanceOf(TbNodeException.class)
.hasMessage("Unsupported event: " + unsupportedEvent)
.matches(e -> ((TbNodeException) e).isUnrecoverable());
}
@ParameterizedTest
@EnumSource(value = EntityType.class, names = "DEVICE", mode = EnumSource.Mode.EXCLUDE)
public void givenNonDeviceOriginator_whenOnMsg_thenTellsSuccessAndNoActivityActionsTriggered(EntityType unsupportedType) {
// GIVEN
var nonDeviceOriginator = new EntityId() {
@Override
public UUID getId() {
return UUID.randomUUID();
}
@Override
public EntityType getEntityType() {
return unsupportedType;
}
};
var msg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, nonDeviceOriginator, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
// WHEN
node.onMsg(ctxMock, msg);
// THEN
var exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
then(ctxMock).should().tellFailure(eq(msg), exceptionCaptor.capture());
assertThat(exceptionCaptor.getValue())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Unsupported originator entity type: [" + unsupportedType + "]. Only DEVICE entity type is supported.");
then(ctxMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenMetadataDoesNotContainTs_whenOnMsg_thenMsgTsIsUsedAsEventTs() {
// GIVEN
given(ctxMock.getDeviceStateNodeRateLimitConfig()).willReturn("1:1");
try {
initNode(TbMsgType.ACTIVITY_EVENT);
} catch (TbNodeException e) {
fail("Node failed to initialize!", e);
}
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
given(ctxMock.getDeviceStateManager()).willReturn(deviceStateManagerMock);
long msgTs = METADATA_TS + 1;
msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT, msgTs);
// WHEN
node.onMsg(ctxMock, msg);
// THEN
then(deviceStateManagerMock).should().onDeviceActivity(eq(TENANT_ID), eq(DEVICE_ID), eq(msgTs), any());
}
@ParameterizedTest
@MethodSource
public void givenSupportedEventAndDeviceOriginator_whenOnMsg_thenCorrectEventIsSentWithCorrectCallback(TbMsgType supportedEventType, Runnable actionVerification) {
// GIVEN
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
given(ctxMock.getDeviceStateNodeRateLimitConfig()).willReturn("1:1");
given(ctxMock.getDeviceStateManager()).willReturn(deviceStateManagerMock);
try {
initNode(supportedEventType);
} catch (TbNodeException e) {
fail("Node failed to initialize!", e);
}
// WHEN
node.onMsg(ctxMock, msg);
// THEN
actionVerification.run();
TbCallback actualCallback = callbackCaptor.getValue();
actualCallback.onSuccess();
then(ctxMock).should().tellSuccess(msg);
var throwable = new Throwable();
actualCallback.onFailure(throwable);
then(ctxMock).should().tellFailure(msg, throwable);
then(deviceStateManagerMock).shouldHaveNoMoreInteractions();
then(ctxMock).shouldHaveNoMoreInteractions();
}
private static Stream<Arguments> givenSupportedEventAndDeviceOriginator_whenOnMsg_thenCorrectEventIsSentWithCorrectCallback() {
return Stream.of(
Arguments.of(TbMsgType.CONNECT_EVENT, (Runnable) () -> then(deviceStateManagerMock).should().onDeviceConnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.ACTIVITY_EVENT, (Runnable) () -> then(deviceStateManagerMock).should().onDeviceActivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.DISCONNECT_EVENT, (Runnable) () -> then(deviceStateManagerMock).should().onDeviceDisconnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.INACTIVITY_EVENT, (Runnable) () -> then(deviceStateManagerMock).should().onDeviceInactivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture()))
);
}
private void initNode(TbMsgType event) throws TbNodeException {
config.setEvent(event);
var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctxMock, nodeConfig);
}
}