From 2f22a5e5817e54a08bd2315d539e4e8f8f3ce2fa Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Fri, 28 Feb 2025 11:36:42 +0200 Subject: [PATCH 1/4] Save time series strategies: ensure Device State Service is notified about inactivity timeout updates --- .../server/actors/ActorSystemContext.java | 4 +- .../actors/ruleChain/DefaultTbContext.java | 4 +- .../queue/DefaultTbCoreConsumerService.java | 18 ++ .../service/queue/TbCoreConsumerStats.java | 8 + .../state/DefaultDeviceStateManager.java | 180 ++++++++++++++++ .../DefaultRuleEngineDeviceStateManager.java | 196 ------------------ .../DefaultSubscriptionManagerService.java | 3 - .../DefaultTelemetrySubscriptionService.java | 36 +++- ...ava => DefaultDeviceStateManagerTest.java} | 36 ++-- ...faultTelemetrySubscriptionServiceTest.java | 5 +- common/proto/src/main/proto/queue.proto | 9 + .../thingsboard/common/util/DonAsynchron.java | 6 + ...teManager.java => DeviceStateManager.java} | 4 +- .../rule/engine/api/TbContext.java | 2 +- .../rule/engine/action/TbDeviceStateNode.java | 4 +- .../engine/action/TbDeviceStateNodeTest.java | 14 +- 16 files changed, 295 insertions(+), 234 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateManager.java delete mode 100644 application/src/main/java/org/thingsboard/server/service/state/DefaultRuleEngineDeviceStateManager.java rename application/src/test/java/org/thingsboard/server/service/state/{DefaultRuleEngineDeviceStateManagerTest.java => DefaultDeviceStateManagerTest.java} (81%) rename rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/{RuleEngineDeviceStateManager.java => DeviceStateManager.java} (88%) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index eb76473386..1cec80dffc 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -33,7 +33,7 @@ import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.NotificationCenter; -import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager; +import org.thingsboard.rule.engine.api.DeviceStateManager; import org.thingsboard.rule.engine.api.SmsService; import org.thingsboard.rule.engine.api.notification.SlackService; import org.thingsboard.rule.engine.api.sms.SmsSenderFactory; @@ -206,7 +206,7 @@ public class ActorSystemContext { @Autowired(required = false) @Getter - private RuleEngineDeviceStateManager deviceStateManager; + private DeviceStateManager deviceStateManager; @Autowired @Getter diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index a1e8434b6e..64afa3eca0 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -29,7 +29,7 @@ import org.thingsboard.rule.engine.api.RuleEngineAlarmService; import org.thingsboard.rule.engine.api.RuleEngineApiUsageStateService; import org.thingsboard.rule.engine.api.RuleEngineAssetProfileCache; import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache; -import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager; +import org.thingsboard.rule.engine.api.DeviceStateManager; import org.thingsboard.rule.engine.api.RuleEngineRpcService; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.ScriptEngine; @@ -724,7 +724,7 @@ public class DefaultTbContext implements TbContext { } @Override - public RuleEngineDeviceStateManager getDeviceStateManager() { + public DeviceStateManager getDeviceStateManager() { return mainCtx.getDeviceStateManager(); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 586902d542..f9f4e86773 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -289,6 +289,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService future = deviceActivityEventsExecutor.submit(() -> stateService.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, deviceInactivityTimeoutUpdateMsg.getInactivityTimeout())); + DonAsynchron.withCallback(future, + __ -> callback.onSuccess(), + t -> { + log.warn("[{}] Failed to process device inactivity timeout update message for device [{}]", tenantId.getId(), deviceId.getId(), t); + callback.onFailure(t); + }); + } + private void forwardToNotificationSchedulerService(TransportProtos.NotificationSchedulerServiceMsg msg, TbCallback callback) { TenantId tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); NotificationRequestId notificationRequestId = new NotificationRequestId(new UUID(msg.getRequestIdMSB(), msg.getRequestIdLSB())); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java index 728f18bfff..46a42284b4 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java @@ -40,6 +40,7 @@ public class TbCoreConsumerStats { public static final String DEVICE_ACTIVITIES = "deviceActivity"; public static final String DEVICE_DISCONNECTS = "deviceDisconnect"; public static final String DEVICE_INACTIVITIES = "deviceInactivity"; + public static final String DEVICE_INACTIVITY_TIMEOUT_UPDATES = "deviceInactivityTimeoutUpdate"; public static final String TO_CORE_NF_OTHER = "coreNfOther"; // normally, there is no messages when codebase is fine public static final String TO_CORE_NF_COMPONENT_LIFECYCLE = "coreNfCompLfcl"; @@ -65,6 +66,7 @@ public class TbCoreConsumerStats { private final StatsCounter deviceActivitiesCounter; private final StatsCounter deviceDisconnectsCounter; private final StatsCounter deviceInactivitiesCounter; + private final StatsCounter deviceInactivityTimeoutUpdatesCounter; private final StatsCounter toCoreNfOtherCounter; private final StatsCounter toCoreNfComponentLifecycleCounter; @@ -95,6 +97,7 @@ public class TbCoreConsumerStats { this.deviceActivitiesCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_ACTIVITIES)); this.deviceDisconnectsCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_DISCONNECTS)); this.deviceInactivitiesCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_INACTIVITIES)); + this.deviceInactivityTimeoutUpdatesCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_INACTIVITY_TIMEOUT_UPDATES)); // Core notification counters this.toCoreNfOtherCounter = register(statsFactory.createStatsCounter(statsKey, TO_CORE_NF_OTHER)); @@ -163,6 +166,11 @@ public class TbCoreConsumerStats { deviceInactivitiesCounter.increment(); } + public void log(TransportProtos.DeviceInactivityTimeoutUpdateProto msg) { + totalCounter.increment(); + deviceInactivityTimeoutUpdatesCounter.increment(); + } + public void log(TransportProtos.SubscriptionMgrMsgProto msg) { totalCounter.increment(); subscriptionMsgCounter.increment(); diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateManager.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateManager.java new file mode 100644 index 0000000000..26750e887e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateManager.java @@ -0,0 +1,180 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.state; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.thingsboard.rule.engine.api.DeviceStateManager; +import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.common.SimpleTbQueueCallback; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; + +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Supplier; + +@Slf4j +@Service +@RequiredArgsConstructor +public class DefaultDeviceStateManager implements DeviceStateManager { + + private final TbServiceInfoProvider serviceInfoProvider; + private final PartitionService partitionService; + + private final Optional deviceStateService; + private final TbClusterService clusterService; + + @Override + public void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long connectTime, TbCallback callback) { + forwardToDeviceStateService(tenantId, deviceId, + deviceStateService -> { + log.debug("[{}][{}] Forwarding device connect event to local service. Connect time: [{}].", tenantId.getId(), deviceId.getId(), connectTime); + deviceStateService.onDeviceConnect(tenantId, deviceId, connectTime); + }, + () -> { + log.debug("[{}][{}] Sending device connect message to core. Connect time: [{}].", tenantId.getId(), deviceId.getId(), connectTime); + var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setLastConnectTime(connectTime) + .build(); + return TransportProtos.ToCoreMsg.newBuilder() + .setDeviceConnectMsg(deviceConnectMsg) + .build(); + }, callback); + } + + @Override + public void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long activityTime, TbCallback callback) { + forwardToDeviceStateService(tenantId, deviceId, + deviceStateService -> { + log.debug("[{}][{}] Forwarding device activity event to local service. Activity time: [{}].", tenantId.getId(), deviceId.getId(), activityTime); + deviceStateService.onDeviceActivity(tenantId, deviceId, activityTime); + }, + () -> { + log.debug("[{}][{}] Sending device activity message to core. Activity time: [{}].", tenantId.getId(), deviceId.getId(), activityTime); + var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setLastActivityTime(activityTime) + .build(); + return TransportProtos.ToCoreMsg.newBuilder() + .setDeviceActivityMsg(deviceActivityMsg) + .build(); + }, callback); + } + + @Override + public void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId, long disconnectTime, TbCallback callback) { + forwardToDeviceStateService(tenantId, deviceId, + deviceStateService -> { + log.debug("[{}][{}] Forwarding device disconnect event to local service. Disconnect time: [{}].", tenantId.getId(), deviceId.getId(), disconnectTime); + deviceStateService.onDeviceDisconnect(tenantId, deviceId, disconnectTime); + }, + () -> { + log.debug("[{}][{}] Sending device disconnect message to core. Disconnect time: [{}].", tenantId.getId(), deviceId.getId(), disconnectTime); + var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setLastDisconnectTime(disconnectTime) + .build(); + return TransportProtos.ToCoreMsg.newBuilder() + .setDeviceDisconnectMsg(deviceDisconnectMsg) + .build(); + }, callback); + } + + @Override + public void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long inactivityTime, TbCallback callback) { + forwardToDeviceStateService(tenantId, deviceId, + deviceStateService -> { + log.debug("[{}][{}] Forwarding device inactivity event to local service. Inactivity time: [{}].", tenantId.getId(), deviceId.getId(), inactivityTime); + deviceStateService.onDeviceInactivity(tenantId, deviceId, inactivityTime); + }, + () -> { + log.debug("[{}][{}] Sending device inactivity message to core. Inactivity time: [{}].", tenantId.getId(), deviceId.getId(), inactivityTime); + var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setLastInactivityTime(inactivityTime) + .build(); + return TransportProtos.ToCoreMsg.newBuilder() + .setDeviceInactivityMsg(deviceInactivityMsg) + .build(); + }, callback); + } + + @Override + public void onDeviceInactivityTimeoutUpdate(TenantId tenantId, DeviceId deviceId, long inactivityTimeout, TbCallback callback) { + forwardToDeviceStateService(tenantId, deviceId, + deviceStateService -> { + log.debug("[{}][{}] Forwarding device inactivity timeout update to local service. Updated inactivity timeout: [{}].", tenantId.getId(), deviceId.getId(), inactivityTimeout); + deviceStateService.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, inactivityTimeout); + }, + () -> { + log.debug("[{}][{}] Sending device inactivity timeout update message to core. Updated inactivity timeout: [{}].", tenantId.getId(), deviceId.getId(), inactivityTimeout); + var deviceInactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setInactivityTimeout(inactivityTimeout) + .build(); + return TransportProtos.ToCoreMsg.newBuilder() + .setDeviceInactivityTimeoutUpdateMsg(deviceInactivityTimeoutUpdateMsg) + .build(); + }, callback); + } + + private void forwardToDeviceStateService( + TenantId tenantId, DeviceId deviceId, + Consumer toDeviceStateService, + Supplier toCore, + TbCallback callback + ) { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); + if (serviceInfoProvider.isService(ServiceType.TB_CORE) && tpi.isMyPartition() && deviceStateService.isPresent()) { + try { + toDeviceStateService.accept(deviceStateService.get()); + } catch (Exception e) { + log.error("[{}][{}] Failed to process device connectivity event.", tenantId.getId(), deviceId.getId(), e); + callback.onFailure(e); + return; + } + callback.onSuccess(); + } else { + TransportProtos.ToCoreMsg toCoreMsg = toCore.get(); + clusterService.pushMsgToCore(tpi, deviceId.getId(), toCoreMsg, new SimpleTbQueueCallback(__ -> callback.onSuccess(), callback::onFailure)); + } + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultRuleEngineDeviceStateManager.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultRuleEngineDeviceStateManager.java deleted file mode 100644 index b064c1e8c6..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultRuleEngineDeviceStateManager.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Copyright © 2016-2025 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.state; - -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; -import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager; -import org.thingsboard.server.cluster.TbClusterService; -import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.queue.common.SimpleTbQueueCallback; -import org.thingsboard.server.queue.discovery.PartitionService; -import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; -import org.thingsboard.server.queue.util.TbRuleEngineComponent; - -import java.util.Optional; -import java.util.UUID; - -@Slf4j -@Service -@TbRuleEngineComponent -public class DefaultRuleEngineDeviceStateManager implements RuleEngineDeviceStateManager { - - private final TbServiceInfoProvider serviceInfoProvider; - private final PartitionService partitionService; - - private final Optional deviceStateService; - private final TbClusterService clusterService; - - public DefaultRuleEngineDeviceStateManager( - TbServiceInfoProvider serviceInfoProvider, PartitionService partitionService, - Optional deviceStateServiceOptional, TbClusterService clusterService - ) { - this.serviceInfoProvider = serviceInfoProvider; - this.partitionService = partitionService; - this.deviceStateService = deviceStateServiceOptional; - this.clusterService = clusterService; - } - - @Getter - private abstract static class ConnectivityEventInfo { - - private final TenantId tenantId; - private final DeviceId deviceId; - private final long eventTime; - - private ConnectivityEventInfo(TenantId tenantId, DeviceId deviceId, long eventTime) { - this.tenantId = tenantId; - this.deviceId = deviceId; - this.eventTime = eventTime; - } - - abstract void forwardToLocalService(); - - abstract TransportProtos.ToCoreMsg toQueueMsg(); - - } - - @Override - public void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long connectTime, TbCallback callback) { - routeEvent(new ConnectivityEventInfo(tenantId, deviceId, connectTime) { - @Override - void forwardToLocalService() { - deviceStateService.ifPresent(service -> service.onDeviceConnect(tenantId, deviceId, connectTime)); - } - - @Override - TransportProtos.ToCoreMsg toQueueMsg() { - var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder() - .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) - .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) - .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) - .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) - .setLastConnectTime(connectTime) - .build(); - return TransportProtos.ToCoreMsg.newBuilder() - .setDeviceConnectMsg(deviceConnectMsg) - .build(); - } - }, callback); - } - - @Override - public void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long activityTime, TbCallback callback) { - routeEvent(new ConnectivityEventInfo(tenantId, deviceId, activityTime) { - @Override - void forwardToLocalService() { - deviceStateService.ifPresent(service -> service.onDeviceActivity(tenantId, deviceId, activityTime)); - } - - @Override - TransportProtos.ToCoreMsg toQueueMsg() { - var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder() - .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) - .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) - .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) - .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) - .setLastActivityTime(activityTime) - .build(); - return TransportProtos.ToCoreMsg.newBuilder() - .setDeviceActivityMsg(deviceActivityMsg) - .build(); - } - }, callback); - } - - @Override - public void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId, long disconnectTime, TbCallback callback) { - routeEvent(new ConnectivityEventInfo(tenantId, deviceId, disconnectTime) { - @Override - void forwardToLocalService() { - deviceStateService.ifPresent(service -> service.onDeviceDisconnect(tenantId, deviceId, disconnectTime)); - } - - @Override - TransportProtos.ToCoreMsg toQueueMsg() { - var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder() - .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) - .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) - .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) - .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) - .setLastDisconnectTime(disconnectTime) - .build(); - return TransportProtos.ToCoreMsg.newBuilder() - .setDeviceDisconnectMsg(deviceDisconnectMsg) - .build(); - } - }, callback); - } - - @Override - public void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long inactivityTime, TbCallback callback) { - routeEvent(new ConnectivityEventInfo(tenantId, deviceId, inactivityTime) { - @Override - void forwardToLocalService() { - deviceStateService.ifPresent(service -> service.onDeviceInactivity(tenantId, deviceId, inactivityTime)); - } - - @Override - TransportProtos.ToCoreMsg toQueueMsg() { - var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder() - .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) - .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) - .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) - .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) - .setLastInactivityTime(inactivityTime) - .build(); - return TransportProtos.ToCoreMsg.newBuilder() - .setDeviceInactivityMsg(deviceInactivityMsg) - .build(); - } - }, callback); - } - - private void routeEvent(ConnectivityEventInfo eventInfo, TbCallback callback) { - var tenantId = eventInfo.getTenantId(); - var deviceId = eventInfo.getDeviceId(); - long eventTime = eventInfo.getEventTime(); - - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); - if (serviceInfoProvider.isService(ServiceType.TB_CORE) && tpi.isMyPartition() && deviceStateService.isPresent()) { - log.debug("[{}][{}] Forwarding device connectivity event to local service. Event time: [{}].", tenantId.getId(), deviceId.getId(), eventTime); - try { - eventInfo.forwardToLocalService(); - } catch (Exception e) { - log.error("[{}][{}] Failed to process device connectivity event. Event time: [{}].", tenantId.getId(), deviceId.getId(), eventTime, e); - callback.onFailure(e); - return; - } - callback.onSuccess(); - } else { - TransportProtos.ToCoreMsg msg = eventInfo.toQueueMsg(); - log.debug("[{}][{}] Sending device connectivity message to core. Event time: [{}].", tenantId.getId(), deviceId.getId(), eventTime); - clusterService.pushMsgToCore(tpi, UUID.randomUUID(), msg, new SimpleTbQueueCallback(__ -> callback.onSuccess(), callback::onFailure)); - } - } - -} diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java index 23adacfc2e..6d52d86012 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java @@ -161,9 +161,6 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene @Override public void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List ts, TbCallback callback) { onTimeSeriesUpdate(entityId, ts); - if (entityId.getEntityType() == EntityType.DEVICE) { - updateDeviceInactivityTimeout(tenantId, entityId, ts); - } callback.onSuccess(); } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 568d80787d..54e87348bb 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -31,6 +31,7 @@ import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.DeviceStateManager; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; @@ -38,9 +39,11 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -50,6 +53,7 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.util.KvUtils; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; +import org.thingsboard.server.service.state.DefaultDeviceStateService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import java.util.ArrayList; @@ -75,6 +79,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer private final TbEntityViewService tbEntityViewService; private final TbApiUsageReportClient apiUsageClient; private final TbApiUsageStateService apiUsageStateService; + private final DeviceStateManager deviceStateManager; private ExecutorService tsCallBackExecutor; @@ -85,12 +90,14 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer TimeseriesService tsService, @Lazy TbEntityViewService tbEntityViewService, TbApiUsageReportClient apiUsageClient, - TbApiUsageStateService apiUsageStateService) { + TbApiUsageStateService apiUsageStateService, + DeviceStateManager deviceStateManager) { this.attrService = attrService; this.tsService = tsService; this.tbEntityViewService = tbEntityViewService; this.apiUsageClient = apiUsageClient; this.apiUsageStateService = apiUsageStateService; + this.deviceStateManager = deviceStateManager; } @PostConstruct @@ -146,6 +153,14 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer saveFuture = Futures.immediateFuture(0); } + if (entityId.getEntityType() == EntityType.DEVICE && request.getStrategy().saveLatest()) { + findNewInactivityTimeout(request.getEntries()).ifPresent(newInactivityTimeout -> + addMainCallback(saveFuture, __ -> deviceStateManager.onDeviceInactivityTimeoutUpdate( + tenantId, new DeviceId(entityId.getId()), newInactivityTimeout, TbCallback.EMPTY) + ) + ); + } + addMainCallback(saveFuture, request.getCallback()); if (strategy.sendWsUpdate()) { addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); @@ -156,6 +171,21 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer return saveFuture; } + private static Optional findNewInactivityTimeout(List entries) { + return entries.stream() + .filter(entry -> Objects.equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT, entry.getKey())) + .findFirst() + .map(DefaultTelemetrySubscriptionService::parseAsLong); + } + + private static long parseAsLong(KvEntry kve) { + try { + return Long.parseLong(kve.getValueAsString()); + } catch (NumberFormatException e) { + return 0L; + } + } + @Override public void saveAttributes(AttributesSaveRequest request) { checkInternalEntity(request.getEntityId()); @@ -312,6 +342,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer addMainCallback(saveFuture, result -> callback.onSuccess(null), callback::onFailure); } + private void addMainCallback(ListenableFuture saveFuture, Consumer onSuccess) { + addMainCallback(saveFuture, onSuccess, null); + } + private void addMainCallback(ListenableFuture saveFuture, Consumer onSuccess, Consumer onFailure) { DonAsynchron.withCallback(saveFuture, onSuccess, onFailure, tsCallBackExecutor); } diff --git a/application/src/test/java/org/thingsboard/server/service/state/DefaultRuleEngineDeviceStateManagerTest.java b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateManagerTest.java similarity index 81% rename from application/src/test/java/org/thingsboard/server/service/state/DefaultRuleEngineDeviceStateManagerTest.java rename to application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateManagerTest.java index 28ea2d270a..8d2a747d2d 100644 --- a/application/src/test/java/org/thingsboard/server/service/state/DefaultRuleEngineDeviceStateManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateManagerTest.java @@ -52,7 +52,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; @ExtendWith(MockitoExtension.class) -public class DefaultRuleEngineDeviceStateManagerTest { +public class DefaultDeviceStateManagerTest { @Mock private DeviceStateService deviceStateServiceMock; @@ -71,7 +71,7 @@ public class DefaultRuleEngineDeviceStateManagerTest { @Captor private ArgumentCaptor queueCallbackCaptor; - private DefaultRuleEngineDeviceStateManager deviceStateManager; + private DefaultDeviceStateManager deviceStateManager; private static final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("57ab2e6c-bc4c-11ee-a506-0242ac120002")); private static final DeviceId DEVICE_ID = DeviceId.fromString("74a9053e-bc4c-11ee-a506-0242ac120002"); @@ -82,7 +82,7 @@ public class DefaultRuleEngineDeviceStateManagerTest { @BeforeEach public void setup() { - deviceStateManager = new DefaultRuleEngineDeviceStateManager(serviceInfoProviderMock, partitionServiceMock, Optional.of(deviceStateServiceMock), clusterServiceMock); + deviceStateManager = new DefaultDeviceStateManager(serviceInfoProviderMock, partitionServiceMock, Optional.of(deviceStateServiceMock), clusterServiceMock); } @ParameterizedTest @@ -90,7 +90,7 @@ public class DefaultRuleEngineDeviceStateManagerTest { "when onDeviceX() is called, then should route event to local service and call onSuccess() callback.") @MethodSource public void givenRoutedToLocalAndProcessingSuccess_whenOnDeviceAction_thenShouldCallLocalServiceAndSuccessCallback( - BiConsumer onDeviceAction, Consumer actionVerification + BiConsumer onDeviceAction, Consumer actionVerification ) { // GIVEN given(serviceInfoProviderMock.isService(ServiceType.TB_CORE)).willReturn(true); @@ -109,19 +109,19 @@ public class DefaultRuleEngineDeviceStateManagerTest { private static Stream givenRoutedToLocalAndProcessingSuccess_whenOnDeviceAction_thenShouldCallLocalServiceAndSuccessCallback() { return Stream.of( Arguments.of( - (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), (Consumer) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS) ), Arguments.of( - (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), (Consumer) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS) ), Arguments.of( - (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), (Consumer) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS) ), Arguments.of( - (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), (Consumer) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS) ) ); @@ -132,7 +132,7 @@ public class DefaultRuleEngineDeviceStateManagerTest { "when onDeviceX() is called, then should route event to local service and call onFailure() callback.") @MethodSource public void givenRoutedToLocalAndProcessingFailure_whenOnDeviceAction_thenShouldCallLocalServiceAndFailureCallback( - Consumer exceptionThrowSetup, BiConsumer onDeviceAction, Consumer actionVerification + Consumer exceptionThrowSetup, BiConsumer onDeviceAction, Consumer actionVerification ) { // GIVEN given(serviceInfoProviderMock.isService(ServiceType.TB_CORE)).willReturn(true); @@ -155,22 +155,22 @@ public class DefaultRuleEngineDeviceStateManagerTest { return Stream.of( Arguments.of( (Consumer) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS), - (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), (Consumer) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS) ), Arguments.of( (Consumer) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS), - (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), (Consumer) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS) ), Arguments.of( (Consumer) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS), - (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), (Consumer) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS) ), Arguments.of( (Consumer) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS), - (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), (Consumer) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS) ) ); @@ -181,7 +181,7 @@ public class DefaultRuleEngineDeviceStateManagerTest { "when onDeviceX() is called, then should send correct queue message to external service with correct callback object.") @MethodSource public void givenRoutedToExternal_whenOnDeviceAction_thenShouldSendQueueMsgToExternalServiceWithCorrectCallback( - BiConsumer onDeviceAction, BiConsumer> actionVerification + BiConsumer onDeviceAction, BiConsumer> actionVerification ) { // WHEN ReflectionTestUtils.setField(deviceStateManager, "deviceStateService", Optional.empty()); @@ -203,7 +203,7 @@ public class DefaultRuleEngineDeviceStateManagerTest { private static Stream givenRoutedToExternal_whenOnDeviceAction_thenShouldSendQueueMsgToExternalServiceWithCorrectCallback() { return Stream.of( Arguments.of( - (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), (BiConsumer>) (clusterServiceMock, queueCallbackCaptor) -> { var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder() .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) @@ -219,7 +219,7 @@ public class DefaultRuleEngineDeviceStateManagerTest { } ), Arguments.of( - (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), (BiConsumer>) (clusterServiceMock, queueCallbackCaptor) -> { var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder() .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) @@ -235,7 +235,7 @@ public class DefaultRuleEngineDeviceStateManagerTest { } ), Arguments.of( - (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), (BiConsumer>) (clusterServiceMock, queueCallbackCaptor) -> { var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder() .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) @@ -251,7 +251,7 @@ public class DefaultRuleEngineDeviceStateManagerTest { } ), Arguments.of( - (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), (BiConsumer>) (clusterServiceMock, queueCallbackCaptor) -> { var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder() .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java index dbd61b638e..c04b92e6e4 100644 --- a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -29,6 +29,7 @@ import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.rule.engine.api.DeviceStateManager; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageRecordKey; @@ -125,12 +126,14 @@ class DefaultTelemetrySubscriptionServiceTest { TbApiUsageReportClient apiUsageClient; @Mock TbApiUsageStateService apiUsageStateService; + @Mock + DeviceStateManager deviceStateManager; DefaultTelemetrySubscriptionService telemetryService; @BeforeEach void setup() { - telemetryService = new DefaultTelemetrySubscriptionService(attrService, tsService, tbEntityViewService, apiUsageClient, apiUsageStateService); + telemetryService = new DefaultTelemetrySubscriptionService(attrService, tsService, tbEntityViewService, apiUsageClient, apiUsageStateService, deviceStateManager); ReflectionTestUtils.setField(telemetryService, "clusterService", clusterService); ReflectionTestUtils.setField(telemetryService, "partitionService", partitionService); ReflectionTestUtils.setField(telemetryService, "subscriptionManagerService", Optional.of(subscriptionManagerService)); diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 1ecd4e6a3a..8227624af5 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -772,6 +772,14 @@ message DeviceInactivityProto { int64 lastInactivityTime = 5; } +message DeviceInactivityTimeoutUpdateProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 deviceIdMSB = 3; + int64 deviceIdLSB = 4; + int64 inactivityTimeout = 5; +} + //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. message SubscriptionInfoProto { int64 lastActivityTime = 1; @@ -1515,6 +1523,7 @@ message ToCoreMsg { DeviceConnectProto deviceConnectMsg = 50; DeviceDisconnectProto deviceDisconnectMsg = 51; DeviceInactivityProto deviceInactivityMsg = 52; + DeviceInactivityTimeoutUpdateProto deviceInactivityTimeoutUpdateMsg = 53; } /* High priority messages with low latency are handled by ThingsBoard Core Service separately */ diff --git a/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java b/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java index 008158246e..0f1a56cb17 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java +++ b/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java @@ -36,6 +36,9 @@ public class DonAsynchron { FutureCallback callback = new FutureCallback() { @Override public void onSuccess(T result) { + if (onSuccess == null) { + return; + } try { onSuccess.accept(result); } catch (Throwable th) { @@ -45,6 +48,9 @@ public class DonAsynchron { @Override public void onFailure(Throwable t) { + if (onFailure == null) { + return; + } onFailure.accept(t); } }; diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceStateManager.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/DeviceStateManager.java similarity index 88% rename from rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceStateManager.java rename to rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/DeviceStateManager.java index fb3e282c9a..887f7ecaa2 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceStateManager.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/DeviceStateManager.java @@ -19,7 +19,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.TbCallback; -public interface RuleEngineDeviceStateManager { +public interface DeviceStateManager { void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long connectTime, TbCallback callback); @@ -29,4 +29,6 @@ public interface RuleEngineDeviceStateManager { void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long inactivityTime, TbCallback callback); + void onDeviceInactivityTimeoutUpdate(TenantId tenantId, DeviceId deviceId, long inactivityTimeout, TbCallback callback); + } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 46b63e0595..e4c07c4ddb 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -279,7 +279,7 @@ public interface TbContext { DeviceCredentialsService getDeviceCredentialsService(); - RuleEngineDeviceStateManager getDeviceStateManager(); + DeviceStateManager getDeviceStateManager(); String getDeviceStateNodeRateLimitConfig(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNode.java index 3aed05d372..c9a50cf88d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNode.java @@ -17,7 +17,7 @@ package org.thingsboard.rule.engine.action; import lombok.extern.slf4j.Slf4j; import org.springframework.util.ConcurrentReferenceHashMap; -import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager; +import org.thingsboard.rule.engine.api.DeviceStateManager; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; @@ -119,7 +119,7 @@ public class TbDeviceStateNode implements TbNode { TenantId tenantId = ctx.getTenantId(); long eventTs = msg.getMetaDataTs(); - RuleEngineDeviceStateManager deviceStateManager = ctx.getDeviceStateManager(); + DeviceStateManager deviceStateManager = ctx.getDeviceStateManager(); TbCallback callback = getMsgEnqueuedCallback(ctx, msg); switch (event) { diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbDeviceStateNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbDeviceStateNodeTest.java index b31dffdc01..2c3c61150e 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbDeviceStateNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbDeviceStateNodeTest.java @@ -29,7 +29,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; import org.springframework.util.ConcurrentReferenceHashMap; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager; +import org.thingsboard.rule.engine.api.DeviceStateManager; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; @@ -66,7 +66,7 @@ public class TbDeviceStateNodeTest { @Mock private TbContext ctxMock; @Mock - private RuleEngineDeviceStateManager deviceStateManagerMock; + private DeviceStateManager deviceStateManagerMock; @Captor private ArgumentCaptor callbackCaptor; private TbDeviceStateNode node; @@ -263,7 +263,7 @@ public class TbDeviceStateNodeTest { @ParameterizedTest @MethodSource - public void givenSupportedEventAndDeviceOriginator_whenOnMsg_thenCorrectEventIsSentWithCorrectCallback(TbMsgType supportedEventType, BiConsumer> actionVerification) { + public void givenSupportedEventAndDeviceOriginator_whenOnMsg_thenCorrectEventIsSentWithCorrectCallback(TbMsgType supportedEventType, BiConsumer> actionVerification) { // GIVEN given(ctxMock.getTenantId()).willReturn(TENANT_ID); given(ctxMock.getDeviceStateNodeRateLimitConfig()).willReturn("1:1"); @@ -297,10 +297,10 @@ public class TbDeviceStateNodeTest { private static Stream givenSupportedEventAndDeviceOriginator_whenOnMsg_thenCorrectEventIsSentWithCorrectCallback() { return Stream.of( - Arguments.of(TbMsgType.CONNECT_EVENT, (BiConsumer>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceConnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())), - Arguments.of(TbMsgType.ACTIVITY_EVENT, (BiConsumer>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceActivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())), - Arguments.of(TbMsgType.DISCONNECT_EVENT, (BiConsumer>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceDisconnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())), - Arguments.of(TbMsgType.INACTIVITY_EVENT, (BiConsumer>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceInactivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())) + Arguments.of(TbMsgType.CONNECT_EVENT, (BiConsumer>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceConnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())), + Arguments.of(TbMsgType.ACTIVITY_EVENT, (BiConsumer>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceActivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())), + Arguments.of(TbMsgType.DISCONNECT_EVENT, (BiConsumer>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceDisconnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())), + Arguments.of(TbMsgType.INACTIVITY_EVENT, (BiConsumer>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceInactivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())) ); } From b90522cc9d61837330fb9d76edb3edf382eaafeb Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Fri, 28 Feb 2025 15:56:21 +0200 Subject: [PATCH 2/4] Save time series strategies: tests for inactivity timeout update notification --- .../DefaultTelemetrySubscriptionService.java | 2 +- .../DefaultTbCoreConsumerServiceTest.java | 93 +++++++++++ .../state/DefaultDeviceStateManagerTest.java | 25 +++ ...faultTelemetrySubscriptionServiceTest.java | 158 ++++++++++++++++++ 4 files changed, 277 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 54e87348bb..b643413bfa 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -153,7 +153,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer saveFuture = Futures.immediateFuture(0); } - if (entityId.getEntityType() == EntityType.DEVICE && request.getStrategy().saveLatest()) { + if (entityId.getEntityType() == EntityType.DEVICE && request.getStrategy().saveLatest() /* Device State Service reads from the latest values when initializing */) { findNewInactivityTimeout(request.getEntries()).ifPresent(newInactivityTimeout -> addMainCallback(saveFuture, __ -> deviceStateManager.onDeviceInactivityTimeoutUpdate( tenantId, new DeviceId(entityId.getId()), newInactivityTimeout, TbCallback.EMPTY) diff --git a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java index fe222237c6..26832f6176 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java @@ -532,6 +532,98 @@ public class DefaultTbCoreConsumerServiceTest { then(statsMock).should(never()).log(inactivityMsg); } + @Test + public void givenProcessingSuccess_whenForwardingInactivityTimeoutUpdateMsgToStateService_thenOnSuccessCallbackIsCalled() { + // GIVEN + var inactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setInactivityTimeout(time) + .build(); + + doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock); + + // WHEN + defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock); + + // THEN + then(stateServiceMock).should().onDeviceInactivityTimeoutUpdate(tenantId, deviceId, time); + then(tbCallbackMock).should().onSuccess(); + then(tbCallbackMock).should(never()).onFailure(any()); + } + + @Test + public void givenProcessingFailure_whenForwardingInactivityTimeoutUpdateMsgToStateService_thenOnFailureCallbackIsCalled() { + // GIVEN + var inactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setInactivityTimeout(time) + .build(); + + doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock); + + var runtimeException = new RuntimeException("Something bad happened!"); + doThrow(runtimeException).when(stateServiceMock).onDeviceInactivityTimeoutUpdate(tenantId, deviceId, time); + + // WHEN + defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock); + + // THEN + then(tbCallbackMock).should(never()).onSuccess(); + then(tbCallbackMock).should().onFailure(runtimeException); + } + + @Test + public void givenStatsEnabled_whenForwardingInactivityTimeoutUpdateMsgToStateService_thenStatsAreRecorded() { + // GIVEN + ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock); + ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", true); + + var inactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setInactivityTimeout(time) + .build(); + + doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock); + + // WHEN + defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock); + + // THEN + then(statsMock).should().log(inactivityTimeoutUpdateMsg); + } + + @Test + public void givenStatsDisabled_whenForwardingInactivityTimeoutUpdateMsgToStateService_thenStatsAreNotRecorded() { + // GIVEN + ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock); + ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", false); + + var inactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setInactivityTimeout(time) + .build(); + + doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock); + + // WHEN + defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock); + + // THEN + then(statsMock).should(never()).log(inactivityTimeoutUpdateMsg); + } + @Test public void givenRestApiCallResponseMsgProto_whenForwardToRuleEngineCallService_thenCallOnQueueMsg() { // GIVEN @@ -545,4 +637,5 @@ public class DefaultTbCoreConsumerServiceTest { // THEN then(ruleEngineCallServiceMock).should().onQueueMsg(restApiCallResponseMsgProto, tbCallbackMock); } + } diff --git a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateManagerTest.java b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateManagerTest.java index 8d2a747d2d..6cf0ab9053 100644 --- a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateManagerTest.java @@ -123,6 +123,10 @@ public class DefaultDeviceStateManagerTest { Arguments.of( (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), (Consumer) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS) + ), + Arguments.of( + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (Consumer) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS) ) ); } @@ -172,6 +176,11 @@ public class DefaultDeviceStateManagerTest { (Consumer) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS), (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), (Consumer) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS) + ), + Arguments.of( + (Consumer) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS), + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (Consumer) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS) ) ); } @@ -265,6 +274,22 @@ public class DefaultDeviceStateManagerTest { .build(); then(clusterServiceMock).should().pushMsgToCore(eq(EXTERNAL_TPI), any(UUID.class), eq(toCoreMsg), queueCallbackCaptor.capture()); } + ), + Arguments.of( + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (BiConsumer>) (clusterServiceMock, queueCallbackCaptor) -> { + var deviceInactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder() + .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) + .setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) + .setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits()) + .setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits()) + .setInactivityTimeout(EVENT_TS) + .build(); + var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder() + .setDeviceInactivityTimeoutUpdateMsg(deviceInactivityTimeoutUpdateMsg) + .build(); + then(clusterServiceMock).should().pushMsgToCore(eq(EXTERNAL_TPI), any(UUID.class), eq(toCoreMsg), queueCallbackCaptor.capture()); + } ) ); } diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java index c04b92e6e4..5537cfbb4b 100644 --- a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -35,15 +36,19 @@ import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.ApiUsageStateValue; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; +import org.thingsboard.server.common.data.id.ApiUsageStateId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.objects.AttributesEntityView; import org.thingsboard.server.common.data.objects.TelemetryEntityView; @@ -72,11 +77,16 @@ import java.util.concurrent.ExecutorService; import java.util.stream.LongStream; import java.util.stream.Stream; +import static com.google.common.util.concurrent.Futures.immediateFailedFuture; import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.never; @ExtendWith(MockitoExtension.class) class DefaultTelemetrySubscriptionServiceTest { @@ -368,6 +378,154 @@ class DefaultTelemetrySubscriptionServiceTest { ); } + @Test + void shouldThrowErrorWhenTryingToSaveTimeseriesForApiUsageState() { + // GIVEN + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(new ApiUsageStateId(UUID.randomUUID())) + .entries(sampleTelemetry) + .strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL) + .callback(emptyCallback) + .build(); + + // WHEN + assertThatThrownBy(() -> telemetryService.saveTimeseries(request)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Can't update API Usage State!"); + + // THEN + then(tsService).shouldHaveNoInteractions(); + then(deviceStateManager).shouldHaveNoInteractions(); + } + + @Test + void shouldNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesWasSavedToLatest() { + // GIVEN + var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); + var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L)); + + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(deviceId) + .entry(inactivityTimeout) + .strategy(new TimeseriesSaveRequest.Strategy(false, true, false)) + .callback(emptyCallback) + .build(); + + given(tsService.saveLatest(tenantId, deviceId, List.of(inactivityTimeout))).willReturn(immediateFuture(listOfNNumbers(1))); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + then(deviceStateManager).should().onDeviceInactivityTimeoutUpdate(tenantId, deviceId, 5000L, TbCallback.EMPTY); + } + + @ParameterizedTest + @EnumSource( + value = EntityType.class, + names = {"DEVICE", "API_USAGE_STATE"}, // API usage state excluded due to coverage in another test + mode = EnumSource.Mode.EXCLUDE + ) + void shouldNotNotifyDeviceStateManagerWhenInactivityTimeoutTimeseriesWasUpdatedButEntityTypeIsNotDevice(EntityType entityType) { + // GIVEN + var nonDeviceId = EntityIdFactory.getByTypeAndUuid(entityType, "cc51e450-53e1-11ee-883e-e56b48fd2088"); + var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L)); + + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(nonDeviceId) + .entry(inactivityTimeout) + .strategy(new TimeseriesSaveRequest.Strategy(false, true, false)) + .callback(emptyCallback) + .build(); + + given(tsService.saveLatest(tenantId, nonDeviceId, List.of(inactivityTimeout))).willReturn(immediateFuture(listOfNNumbers(1))); + lenient().when(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, nonDeviceId)).thenReturn(immediateFuture(Collections.emptyList())); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any()); + } + + @Test + void shouldNotNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesWasNotSavedToLatest() { + // GIVEN + var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); + var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L)); + + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(deviceId) + .entry(inactivityTimeout) + .strategy(new TimeseriesSaveRequest.Strategy(true, false, true)) + .callback(emptyCallback) + .build(); + + given(tsService.saveWithoutLatest(tenantId, deviceId, List.of(inactivityTimeout), 0L)).willReturn(immediateFuture(1)); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any()); + } + + @Test + void shouldNotNotifyDeviceStateManagerWhenInactivityTimeoutTimeseriesWasNotUpdated() { + // GIVEN + var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); + var notInactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("notInactivityTimeout", 5000L)); + + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(deviceId) + .entry(notInactivityTimeout) + .strategy(new TimeseriesSaveRequest.Strategy(false, true, false)) + .callback(emptyCallback) + .build(); + + given(tsService.saveLatest(tenantId, deviceId, List.of(notInactivityTimeout))).willReturn(immediateFuture(listOfNNumbers(1))); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any()); + } + + @Test + void shouldNotNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesSaveFailed() { + // GIVEN + var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); + var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L)); + + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(deviceId) + .entry(inactivityTimeout) + .strategy(new TimeseriesSaveRequest.Strategy(false, true, false)) + .callback(emptyCallback) + .build(); + + given(tsService.saveLatest(tenantId, deviceId, List.of(inactivityTimeout))).willReturn(immediateFailedFuture(new RuntimeException("failed to save"))); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any()); + } + // used to emulate sequence numbers returned by save latest API private static List listOfNNumbers(int N) { return LongStream.range(0, N).boxed().toList(); From 877362def0172e1e27c305bfec0a2e1453e8cb89 Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Fri, 28 Feb 2025 17:33:53 +0200 Subject: [PATCH 3/4] Save time series strategies: after-merge fixes --- .../telemetry/DefaultTelemetrySubscriptionServiceTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java index 5756ecd125..b1c84184d2 100644 --- a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -389,7 +389,6 @@ class DefaultTelemetrySubscriptionServiceTest { ); } - // used to emulate versions returned by save latest API @Test void shouldThrowErrorWhenTryingToSaveTimeseriesForApiUsageState() { // GIVEN @@ -477,7 +476,7 @@ class DefaultTelemetrySubscriptionServiceTest { .strategy(new TimeseriesSaveRequest.Strategy(true, false, true, true)) .build(); - given(tsService.saveWithoutLatest(tenantId, deviceId, List.of(inactivityTimeout), 0L)).willReturn(immediateFuture(TimeseriesSaveResult.of(1, listOfNNumbers(1)))); + given(tsService.saveWithoutLatest(tenantId, deviceId, List.of(inactivityTimeout), 0L)).willReturn(immediateFuture(TimeseriesSaveResult.of(1, null))); // WHEN telemetryService.saveTimeseries(request); From fe16d104116fed36e89053140025a06ad4c149c1 Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Thu, 6 Mar 2025 14:25:04 +0200 Subject: [PATCH 4/4] Save time series strategies: always handle inactivity timeout as a server attribute --- .../rpc/sync/DefaultEdgeRequestsService.java | 5 +- .../state/DefaultDeviceStateService.java | 77 ++++---- .../DefaultSubscriptionManagerService.java | 3 - .../DefaultTelemetrySubscriptionService.java | 32 +--- .../state/DefaultDeviceStateServiceTest.java | 40 ---- ...faultTelemetrySubscriptionServiceTest.java | 176 +++--------------- 6 files changed, 67 insertions(+), 266 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index a371582409..6fc6d5bad9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -152,8 +152,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { entityData = new HashMap<>(); attributes = JacksonUtil.newObjectNode(); for (AttributeKvEntry attr : ssAttributes) { - if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(attr.getKey()) - && !DefaultDeviceStateService.INACTIVITY_TIMEOUT.equals(attr.getKey())) { + if (DefaultDeviceStateService.ACTIVITY_KEYS_WITHOUT_INACTIVITY_TIMEOUT.contains(attr.getKey())) { continue; } if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) { @@ -200,7 +199,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { } Map> tsData = new HashMap<>(); for (TsKvEntry tsKvEntry : tsKvEntries) { - if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(tsKvEntry.getKey())) { + if (DefaultDeviceStateService.ACTIVITY_KEYS_WITH_INACTIVITY_TIMEOUT.contains(tsKvEntry.getKey())) { continue; } tsData.computeIfAbsent(tsKvEntry.getTs(), k -> new HashMap<>()).put(tsKvEntry.getKey(), tsKvEntry.getValue()); diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index 11819ce5d8..f8cb1a1059 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -96,6 +96,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.UUID; @@ -129,11 +130,10 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService PERSISTENT_TELEMETRY_KEYS = Arrays.asList( new EntityKey(EntityKeyType.TIME_SERIES, LAST_ACTIVITY_TIME), new EntityKey(EntityKeyType.TIME_SERIES, INACTIVITY_ALARM_TIME), - new EntityKey(EntityKeyType.TIME_SERIES, INACTIVITY_TIMEOUT), new EntityKey(EntityKeyType.TIME_SERIES, ACTIVITY_STATE), new EntityKey(EntityKeyType.TIME_SERIES, LAST_CONNECT_TIME), new EntityKey(EntityKeyType.TIME_SERIES, LAST_DISCONNECT_TIME), - new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT)); + new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT)); // inactivity timeout is always a server attribute, even when activity data is stored as time series private static final List PERSISTENT_ATTRIBUTE_KEYS = Arrays.asList( new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, LAST_ACTIVITY_TIME), @@ -143,8 +143,14 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService PERSISTENT_ATTRIBUTES = Arrays.asList(ACTIVITY_STATE, LAST_CONNECT_TIME, - LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT); + public static final List ACTIVITY_KEYS_WITHOUT_INACTIVITY_TIMEOUT = List.of( + ACTIVITY_STATE, LAST_CONNECT_TIME, LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME + ); + + public static final List ACTIVITY_KEYS_WITH_INACTIVITY_TIMEOUT = List.of( + ACTIVITY_STATE, LAST_CONNECT_TIME, LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT + ); + private static final List PERSISTENT_ENTITY_FIELDS = Arrays.asList( new EntityKey(EntityKeyType.ENTITY_FIELD, "name"), new EntityKey(EntityKeyType.ENTITY_FIELD, "type"), @@ -643,41 +649,45 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService fetchDeviceState(Device device) { ListenableFuture future; if (persistToTelemetry) { - ListenableFuture> tsData = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), PERSISTENT_ATTRIBUTES); - future = Futures.transform(tsData, extractDeviceStateData(device), MoreExecutors.directExecutor()); + ListenableFuture> timeseriesActivityDataFuture = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), ACTIVITY_KEYS_WITHOUT_INACTIVITY_TIMEOUT); + ListenableFuture> inactivityTimeoutAttributeFuture = attributesService.find( + TenantId.SYS_TENANT_ID, device.getId(), AttributeScope.SERVER_SCOPE, INACTIVITY_TIMEOUT + ); + + ListenableFuture> fullActivityDataFuture = Futures.whenAllSucceed(timeseriesActivityDataFuture, inactivityTimeoutAttributeFuture).call(() -> { + List activityTimeseries = Futures.getDone(timeseriesActivityDataFuture); + Optional inactivityTimeoutAttribute = Futures.getDone(inactivityTimeoutAttributeFuture); + + List result; + if (inactivityTimeoutAttribute.isPresent()) { + result = new ArrayList<>(activityTimeseries.size() + 1); + result.addAll(activityTimeseries); + inactivityTimeoutAttribute.ifPresent(result::add); + } else { + return activityTimeseries; + } + + return result; + }, deviceStateCallbackExecutor); + + future = Futures.transform(fullActivityDataFuture, extractDeviceStateData(device), MoreExecutors.directExecutor()); } else { - ListenableFuture> attrData = attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), AttributeScope.SERVER_SCOPE, PERSISTENT_ATTRIBUTES); - future = Futures.transform(attrData, extractDeviceStateData(device), MoreExecutors.directExecutor()); + ListenableFuture> attributesActivityDataFuture = attributesService.find( + TenantId.SYS_TENANT_ID, device.getId(), AttributeScope.SERVER_SCOPE, ACTIVITY_KEYS_WITH_INACTIVITY_TIMEOUT + ); + future = Futures.transform(attributesActivityDataFuture, extractDeviceStateData(device), MoreExecutors.directExecutor()); } - return transformInactivityTimeout(future); + return future; } - private ListenableFuture transformInactivityTimeout(ListenableFuture future) { - return Futures.transformAsync(future, deviceStateData -> { - if (!persistToTelemetry || deviceStateData.getState().getInactivityTimeout() != defaultInactivityTimeoutMs) { - return future; //fail fast - } - var attributesFuture = attributesService.find(TenantId.SYS_TENANT_ID, deviceStateData.getDeviceId(), AttributeScope.SERVER_SCOPE, INACTIVITY_TIMEOUT); - return Futures.transform(attributesFuture, attributes -> { - attributes.flatMap(KvEntry::getLongValue).ifPresent((inactivityTimeout) -> { - if (inactivityTimeout > 0) { - deviceStateData.getState().setInactivityTimeout(inactivityTimeout); - } - }); - return deviceStateData; - }, MoreExecutors.directExecutor()); - }, deviceStateCallbackExecutor); - } - - private Function, DeviceStateData> extractDeviceStateData(Device device) { + private Function, DeviceStateData> extractDeviceStateData(Device device) { return new Function<>() { @Nonnull @Override - public DeviceStateData apply(@Nullable List data) { + public DeviceStateData apply(@Nullable List data) { try { long lastActivityTime = getEntryValue(data, LAST_ACTIVITY_TIME, 0L); long inactivityAlarmTime = getEntryValue(data, INACTIVITY_ALARM_TIME, 0L); @@ -690,7 +700,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService 0 ? inactivityTimeout : defaultInactivityTimeoutMs) .build(); TbMsgMetaData md = new TbMsgMetaData(); md.putValue("deviceName", device.getName()); @@ -761,12 +771,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService keys, TbCallback callback) { onTimeSeriesUpdate(entityId, keys.stream().map(key -> new BasicTsKvEntry(0, new StringDataEntry(key, ""))).collect(Collectors.toList())); - if (entityId.getEntityType() == EntityType.DEVICE) { - deleteDeviceInactivityTimeout(tenantId, entityId, keys); - } callback.onSuccess(); } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index d0507d4fee..36cd5516a8 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -31,7 +31,6 @@ import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; -import org.thingsboard.rule.engine.api.DeviceStateManager; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; @@ -39,11 +38,9 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.id.CustomerId; -import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; @@ -55,7 +52,6 @@ import org.thingsboard.server.dao.util.KvUtils; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.cf.CalculatedFieldQueueService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; -import org.thingsboard.server.service.state.DefaultDeviceStateService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import java.util.ArrayList; @@ -82,7 +78,6 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer private final TbApiUsageReportClient apiUsageClient; private final TbApiUsageStateService apiUsageStateService; private final CalculatedFieldQueueService calculatedFieldQueueService; - private final DeviceStateManager deviceStateManager; private ExecutorService tsCallBackExecutor; @@ -94,15 +89,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Lazy TbEntityViewService tbEntityViewService, TbApiUsageReportClient apiUsageClient, TbApiUsageStateService apiUsageStateService, - CalculatedFieldQueueService calculatedFieldQueueService, - DeviceStateManager deviceStateManager) { + CalculatedFieldQueueService calculatedFieldQueueService) { this.attrService = attrService; this.tsService = tsService; this.tbEntityViewService = tbEntityViewService; this.apiUsageClient = apiUsageClient; this.apiUsageStateService = apiUsageStateService; this.calculatedFieldQueueService = calculatedFieldQueueService; - this.deviceStateManager = deviceStateManager; } @PostConstruct @@ -165,14 +158,6 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } }, t -> request.getCallback().onFailure(t)); - if (entityId.getEntityType() == EntityType.DEVICE && request.getStrategy().saveLatest() /* Device State Service reads from the latest values when initializing */) { - findNewInactivityTimeout(request.getEntries()).ifPresent(newInactivityTimeout -> - addMainCallback(resultFuture, __ -> deviceStateManager.onDeviceInactivityTimeoutUpdate( - tenantId, new DeviceId(entityId.getId()), newInactivityTimeout, TbCallback.EMPTY) - ) - ); - } - if (strategy.sendWsUpdate()) { addWsCallback(resultFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); } @@ -182,21 +167,6 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer return resultFuture; } - private static Optional findNewInactivityTimeout(List entries) { - return entries.stream() - .filter(entry -> Objects.equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT, entry.getKey())) - .findFirst() - .map(DefaultTelemetrySubscriptionService::parseAsLong); - } - - private static long parseAsLong(KvEntry kve) { - try { - return Long.parseLong(kve.getValueAsString()); - } catch (NumberFormatException e) { - return 0L; - } - } - @Override public void saveAttributes(AttributesSaveRequest request) { checkInternalEntity(request.getEntityId()); diff --git a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java index 26c978bbd0..4e36a44957 100644 --- a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java @@ -38,9 +38,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivityTrigger; import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.query.EntityData; -import org.thingsboard.server.common.data.query.EntityKeyType; -import org.thingsboard.server.common.data.query.TsValue; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; @@ -88,7 +85,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.thingsboard.server.service.state.DefaultDeviceStateService.ACTIVITY_STATE; import static org.thingsboard.server.service.state.DefaultDeviceStateService.INACTIVITY_ALARM_TIME; -import static org.thingsboard.server.service.state.DefaultDeviceStateService.INACTIVITY_TIMEOUT; import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_ACTIVITY_TIME; import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_CONNECT_TIME; import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_DISCONNECT_TIME; @@ -508,42 +504,6 @@ public class DefaultDeviceStateServiceTest { verify(service).fetchDeviceStateDataUsingSeparateRequests(deviceId); } - @Test - public void givenPersistToTelemetryAndDefaultInactivityTimeoutFetched_whenTransformingToDeviceStateData_thenTryGetInactivityFromAttribute() { - var defaultInactivityTimeoutInSec = 60L; - var latest = - Map.of( - EntityKeyType.TIME_SERIES, Map.of(INACTIVITY_TIMEOUT, new TsValue(0, Long.toString(defaultInactivityTimeoutInSec * 1000))), - EntityKeyType.SERVER_ATTRIBUTE, Map.of(INACTIVITY_TIMEOUT, new TsValue(0, Long.toString(5000L))) - ); - - process(latest, defaultInactivityTimeoutInSec); - } - - @Test - public void givenPersistToTelemetryAndNoInactivityTimeoutFetchedFromTimeSeries_whenTransformingToDeviceStateData_thenTryGetInactivityFromAttribute() { - var defaultInactivityTimeoutInSec = 60L; - var latest = - Map.of( - EntityKeyType.SERVER_ATTRIBUTE, Map.of(INACTIVITY_TIMEOUT, new TsValue(0, Long.toString(5000L))) - ); - - process(latest, defaultInactivityTimeoutInSec); - } - - private void process(Map> latest, long defaultInactivityTimeoutInSec) { - service.setDefaultInactivityTimeoutInSec(defaultInactivityTimeoutInSec); - service.setDefaultInactivityTimeoutMs(defaultInactivityTimeoutInSec * 1000); - service.setPersistToTelemetry(true); - - var deviceUuid = UUID.randomUUID(); - var deviceId = new DeviceId(deviceUuid); - - DeviceStateData deviceStateData = service.toDeviceStateData(new EntityData(deviceId, latest, Map.of()), new DeviceIdInfo(TenantId.SYS_TENANT_ID.getId(), UUID.randomUUID(), deviceUuid)); - - assertThat(deviceStateData.getState().getInactivityTimeout()).isEqualTo(5000L); - } - private void initStateService(long timeout) throws InterruptedException { service.stop(); reset(service, telemetrySubscriptionService); diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java index b1c84184d2..6416fb4f98 100644 --- a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -24,30 +24,25 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; -import org.thingsboard.rule.engine.api.DeviceStateManager; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.ApiUsageStateValue; -import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.id.ApiUsageStateId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; -import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.objects.AttributesEntityView; @@ -78,17 +73,14 @@ import java.util.concurrent.ExecutorService; import java.util.stream.LongStream; import java.util.stream.Stream; -import static com.google.common.util.concurrent.Futures.immediateFailedFuture; import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.never; @ExtendWith(MockitoExtension.class) class DefaultTelemetrySubscriptionServiceTest { @@ -132,14 +124,12 @@ class DefaultTelemetrySubscriptionServiceTest { TbApiUsageStateService apiUsageStateService; @Mock CalculatedFieldQueueService calculatedFieldQueueService; - @Mock - DeviceStateManager deviceStateManager; DefaultTelemetrySubscriptionService telemetryService; @BeforeEach void setup() { - telemetryService = new DefaultTelemetrySubscriptionService(attrService, tsService, tbEntityViewService, apiUsageClient, apiUsageStateService, calculatedFieldQueueService, deviceStateManager); + telemetryService = new DefaultTelemetrySubscriptionService(attrService, tsService, tbEntityViewService, apiUsageClient, apiUsageStateService, calculatedFieldQueueService); ReflectionTestUtils.setField(telemetryService, "clusterService", clusterService); ReflectionTestUtils.setField(telemetryService, "partitionService", partitionService); ReflectionTestUtils.setField(telemetryService, "subscriptionManagerService", Optional.of(subscriptionManagerService)); @@ -180,6 +170,28 @@ class DefaultTelemetrySubscriptionServiceTest { tsCallBackExecutor.shutdownNow(); } + /* --- Save time series API --- */ + + @Test + void shouldThrowErrorWhenTryingToSaveTimeseriesForApiUsageState() { + // GIVEN + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(new ApiUsageStateId(UUID.randomUUID())) + .entries(sampleTelemetry) + .strategy(TimeseriesSaveRequest.Strategy.PROCESS_ALL) + .build(); + + // WHEN + assertThatThrownBy(() -> telemetryService.saveTimeseries(request)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Can't update API Usage State!"); + + // THEN + then(tsService).shouldHaveNoInteractions(); + } + @Test void shouldReportStorageDataPointsApiUsageWhenTimeSeriesIsSaved() { // GIVEN @@ -389,148 +401,6 @@ class DefaultTelemetrySubscriptionServiceTest { ); } - @Test - void shouldThrowErrorWhenTryingToSaveTimeseriesForApiUsageState() { - // GIVEN - var request = TimeseriesSaveRequest.builder() - .tenantId(tenantId) - .customerId(customerId) - .entityId(new ApiUsageStateId(UUID.randomUUID())) - .entries(sampleTelemetry) - .strategy(TimeseriesSaveRequest.Strategy.PROCESS_ALL) - .build(); - - // WHEN - assertThatThrownBy(() -> telemetryService.saveTimeseries(request)) - .isInstanceOf(RuntimeException.class) - .hasMessage("Can't update API Usage State!"); - - // THEN - then(tsService).shouldHaveNoInteractions(); - then(deviceStateManager).shouldHaveNoInteractions(); - } - - @Test - void shouldNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesWasSavedToLatest() { - // GIVEN - var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); - var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L)); - - var request = TimeseriesSaveRequest.builder() - .tenantId(tenantId) - .customerId(customerId) - .entityId(deviceId) - .entry(inactivityTimeout) - .strategy(new TimeseriesSaveRequest.Strategy(false, true, false, false)) - .build(); - - given(tsService.saveLatest(tenantId, deviceId, List.of(inactivityTimeout))).willReturn(immediateFuture(TimeseriesSaveResult.of(1, listOfNNumbers(1)))); - - // WHEN - telemetryService.saveTimeseries(request); - - // THEN - then(deviceStateManager).should().onDeviceInactivityTimeoutUpdate(tenantId, deviceId, 5000L, TbCallback.EMPTY); - } - - @ParameterizedTest - @EnumSource( - value = EntityType.class, - names = {"DEVICE", "API_USAGE_STATE"}, // API usage state excluded due to coverage in another test - mode = EnumSource.Mode.EXCLUDE - ) - void shouldNotNotifyDeviceStateManagerWhenInactivityTimeoutTimeseriesWasUpdatedButEntityTypeIsNotDevice(EntityType entityType) { - // GIVEN - var nonDeviceId = EntityIdFactory.getByTypeAndUuid(entityType, "cc51e450-53e1-11ee-883e-e56b48fd2088"); - var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L)); - - var request = TimeseriesSaveRequest.builder() - .tenantId(tenantId) - .customerId(customerId) - .entityId(nonDeviceId) - .entry(inactivityTimeout) - .strategy(new TimeseriesSaveRequest.Strategy(false, true, false, false)) - .build(); - - given(tsService.saveLatest(tenantId, nonDeviceId, List.of(inactivityTimeout))).willReturn(immediateFuture(TimeseriesSaveResult.of(1, listOfNNumbers(1)))); - lenient().when(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, nonDeviceId)).thenReturn(immediateFuture(Collections.emptyList())); - - // WHEN - telemetryService.saveTimeseries(request); - - // THEN - then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any()); - } - - @Test - void shouldNotNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesWasNotSavedToLatest() { - // GIVEN - var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); - var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L)); - - var request = TimeseriesSaveRequest.builder() - .tenantId(tenantId) - .customerId(customerId) - .entityId(deviceId) - .entry(inactivityTimeout) - .strategy(new TimeseriesSaveRequest.Strategy(true, false, true, true)) - .build(); - - given(tsService.saveWithoutLatest(tenantId, deviceId, List.of(inactivityTimeout), 0L)).willReturn(immediateFuture(TimeseriesSaveResult.of(1, null))); - - // WHEN - telemetryService.saveTimeseries(request); - - // THEN - then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any()); - } - - @Test - void shouldNotNotifyDeviceStateManagerWhenInactivityTimeoutTimeseriesWasNotUpdated() { - // GIVEN - var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); - var notInactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("notInactivityTimeout", 5000L)); - - var request = TimeseriesSaveRequest.builder() - .tenantId(tenantId) - .customerId(customerId) - .entityId(deviceId) - .entry(notInactivityTimeout) - .strategy(new TimeseriesSaveRequest.Strategy(false, true, false, false)) - .build(); - - given(tsService.saveLatest(tenantId, deviceId, List.of(notInactivityTimeout))).willReturn(immediateFuture(TimeseriesSaveResult.of(1, listOfNNumbers(1)))); - - // WHEN - telemetryService.saveTimeseries(request); - - // THEN - then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any()); - } - - @Test - void shouldNotNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesSaveFailed() { - // GIVEN - var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); - var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L)); - - var request = TimeseriesSaveRequest.builder() - .tenantId(tenantId) - .customerId(customerId) - .entityId(deviceId) - .entry(inactivityTimeout) - .strategy(new TimeseriesSaveRequest.Strategy(false, true, false, false)) - .build(); - - given(tsService.saveLatest(tenantId, deviceId, List.of(inactivityTimeout))).willReturn(immediateFailedFuture(new RuntimeException("failed to save"))); - - // WHEN - telemetryService.saveTimeseries(request); - - // THEN - then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any()); - } - // used to emulate sequence numbers returned by save latest API private static List listOfNNumbers(int N) { return LongStream.range(0, N).boxed().toList();