diff --git a/application/src/main/java/org/thingsboard/server/service/integration/activity/FirstOnlyIntegrationActivityManager.java b/application/src/main/java/org/thingsboard/server/service/integration/activity/FirstOnlyIntegrationActivityManager.java index 239fd5f19e..ae03c1fb94 100644 --- a/application/src/main/java/org/thingsboard/server/service/integration/activity/FirstOnlyIntegrationActivityManager.java +++ b/application/src/main/java/org/thingsboard/server/service/integration/activity/FirstOnlyIntegrationActivityManager.java @@ -115,31 +115,33 @@ public class FirstOnlyIntegrationActivityManager extends AbstractActivityManager @Override protected void doOnReportingPeriodEnd() { for (Map.Entry entry : states.entrySet()) { - var activityKey = entry.getKey(); var activityStateWrapper = entry.getValue(); + if (activityStateWrapper.isAlreadyBeenReported()) { + activityStateWrapper.setAlreadyBeenReported(false); + continue; + } + var activityKey = entry.getKey(); var activityState = activityStateWrapper.getState(); long lastRecordedTime = activityState.getLastRecordedTime(); // if there were no activities during the reporting period, we should remove the entry to prevent memory leaks - if (!activityStateWrapper.isAlreadyBeenReported()) { - log.debug("[{}][{}] No activity events were received during reporting period for device with id: [{}]. Going to remove activity state.", - activityKey.getTenantId().getId(), name, activityKey.getDeviceId().getId()); - states.remove(activityKey); - // report leftover events - if (activityState.getLastReportedTime() < lastRecordedTime) { - log.debug("[{}][{}] Going to report leftover activity event for device with id: [{}].", activityKey.getTenantId().getId(), name, activityKey.getDeviceId().getId()); - reporter.report(activityKey, lastRecordedTime, activityState, new ActivityReportCallback<>() { - @Override - public void onSuccess(IntegrationActivityKey key, long reportedTime) { - updateLastReportedTime(key, reportedTime); // just in case the same key was added again - } + log.debug("[{}][{}] No activity events were received during reporting period for device with id: [{}]. Going to remove activity state.", + activityKey.getTenantId().getId(), name, activityKey.getDeviceId().getId()); + states.remove(activityKey); + // report leftover events + if (activityState.getLastReportedTime() < lastRecordedTime) { + log.debug("[{}][{}] Going to report leftover activity event for device with id: [{}].", activityKey.getTenantId().getId(), name, activityKey.getDeviceId().getId()); + reporter.report(activityKey, lastRecordedTime, activityState, new ActivityReportCallback<>() { + @Override + public void onSuccess(IntegrationActivityKey key, long reportedTime) { + updateLastReportedTime(key, reportedTime); // just in case the same key was added again + } - @Override - public void onFailure(IntegrationActivityKey key, Throwable t) { - log.debug("[{}][{}] Failed to report last activity event in a period for device with id: [{}].", - activityKey.getTenantId().getId(), name, activityKey.getDeviceId().getId()); - } - }); - } + @Override + public void onFailure(IntegrationActivityKey key, Throwable t) { + log.debug("[{}][{}] Failed to report last activity event in a period for device with id: [{}].", + activityKey.getTenantId().getId(), name, activityKey.getDeviceId().getId()); + } + }); } activityStateWrapper.setAlreadyBeenReported(false); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index be52ee1f0d..57e55f4eda 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -785,7 +785,9 @@ state: # If 'persistToTelemetry' is changed from 'false' to 'true': 'CREATE OR REPLACE VIEW device_info_view AS SELECT * FROM device_info_active_ts_view;' # If 'persistToTelemetry' is changed from 'true' to 'false': 'CREATE OR REPLACE VIEW device_info_view AS SELECT * FROM device_info_active_attribute_view;' persistToTelemetry: "${PERSIST_STATE_TO_TELEMETRY:false}" - # Time-to-live for device state telemetry data (e.g. 'active', 'lastActivityTime'). Used only when state.persistToTelemetry is set to 'true'. + # Millisecond value defining time-to-live for device state telemetry data (e.g. 'active', 'lastActivityTime'). + # Used only when state.persistToTelemetry is set to 'true' and Cassandra is used for timeseries data. + # 0 means time-to-live mechanism is disabled. telemetryTtl: "${STATE_TELEMETRY_TTL:0}" # Tbel parameters @@ -870,11 +872,10 @@ transport: report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}" activity: # This property specifies the strategy for reporting activity events within each reporting period. - # The accepted values are 'first', 'last', and 'first and last'. + # The accepted values are 'first', 'last', and 'first-and-last'. # - 'first': Only the first activity event in each reporting period is reported. # - 'last': Only the last activity event in the reporting period is reported. # - 'first-and-last': Both the first and last activity events in the reporting period are reported. - # - 'all': All activity events in the reporting period are reported. reporting_strategy: "${TB_TRANSPORT_ACTIVITY_REPORTING_STRATEGY:last}" json: # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index d2fa8ad8ca..a86bdda312 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -136,6 +136,7 @@ public class DefaultTransportService implements TransportService { public static final String OVERWRITE_ACTIVITY_TIME = "overwriteActivityTime"; public static final String SESSION_EXPIRED_MESSAGE = "Session has expired due to last activity time!"; + private static final String ACTIVITY_MANAGER_NAME = "transport-activity-manager"; public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_OPEN = getSessionEventMsg(TransportProtos.SessionEvent.OPEN); public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_CLOSED = getSessionEventMsg(TransportProtos.SessionEvent.CLOSED); public static final TransportProtos.SessionCloseNotificationProto SESSION_EXPIRED_NOTIFICATION_PROTO = TransportProtos.SessionCloseNotificationProto.newBuilder() @@ -187,7 +188,6 @@ public class DefaultTransportService implements TransportService { private final NotificationRuleProcessor notificationRuleProcessor; private final EntityLimitsCache entityLimitsCache; private ActivityManager activityManager; - private static final String ACTIVITY_MANAGER_NAME = "transport-activity-manager"; protected TbQueueRequestTemplate, TbProtoQueueMsg> transportApiRequestTemplate; protected TbQueueProducer> ruleEngineMsgProducer; @@ -260,7 +260,7 @@ public class DefaultTransportService implements TransportService { } private final ActivityStateReporter activityReporter = (sessionId, timeToReport, state, reportCallback) -> { - log.debug("[{}] Reporting activity state for key: [{}]. Time to report: [{}].", ACTIVITY_MANAGER_NAME, sessionId, timeToReport); + log.debug("[{}] Reporting activity state for session with id: [{}]. Time to report: [{}].", ACTIVITY_MANAGER_NAME, sessionId, timeToReport); SessionMetaData sessionMetaData = sessions.get(sessionId); TransportProtos.SubscriptionInfoProto subscriptionInfo = TransportProtos.SubscriptionInfoProto.newBuilder() .setAttributeSubscription(sessionMetaData != null && sessionMetaData.isSubscribedToAttributes()) diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstAndLastTransportActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstAndLastTransportActivityManager.java index db0ebc7ddb..f14d958307 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstAndLastTransportActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstAndLastTransportActivityManager.java @@ -49,9 +49,7 @@ import org.thingsboard.server.common.transport.service.SessionMetaData; import org.thingsboard.server.common.transport.service.TransportActivityState; import org.thingsboard.server.gen.transport.TransportProtos; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -129,7 +127,6 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage @Override protected void doOnReportingPeriodEnd() { - Set statesToRemove = new HashSet<>(); for (Map.Entry entry : states.entrySet()) { var sessionId = entry.getKey(); var activityStateWrapper = entry.getValue(); @@ -139,8 +136,8 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage if (sessionMetaData != null) { activityState.setSessionInfoProto(sessionMetaData.getSessionInfo()); } else { - log.debug("[{}] Session with id: [{}] is not present. Marking it's activity state for removal.", name, sessionId); - statesToRemove.add(sessionId); + log.debug("[{}] Session with id: [{}] is not present. Removing it's activity state.", name, sessionId); + states.remove(sessionId); } long lastActivityTime = activityState.getLastRecordedTime(); @@ -161,8 +158,8 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage long expirationTime = System.currentTimeMillis() - sessionInactivityTimeout; boolean hasExpired = sessionMetaData != null && lastActivityTime < expirationTime; if (hasExpired) { - log.debug("[{}] Session with id: [{}] has expired due to last activity time: [{}]. Marking it's activity state for removal.", name, sessionId, lastActivityTime); - statesToRemove.add(sessionId); + log.debug("[{}] Session with id: [{}] has expired due to last activity time: [{}]. Removing it's activity state.", name, sessionId, lastActivityTime); + states.remove(sessionId); transportService.deregisterSession(sessionInfo); transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null); sessionMetaData.getListener().onRemoteSessionCloseCommand(sessionId, SESSION_EXPIRED_NOTIFICATION_PROTO); @@ -183,7 +180,6 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage } activityStateWrapper.setAlreadyBeenReported(false); } - statesToRemove.forEach(states::remove); } private void updateLastReportedTime(UUID key, long newLastReportedTime) { diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstOnlyTransportActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstOnlyTransportActivityManager.java index b0762e0b5e..34378caeb6 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstOnlyTransportActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstOnlyTransportActivityManager.java @@ -49,9 +49,7 @@ import org.thingsboard.server.common.transport.service.SessionMetaData; import org.thingsboard.server.common.transport.service.TransportActivityState; import org.thingsboard.server.gen.transport.TransportProtos; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -129,7 +127,6 @@ public class FirstOnlyTransportActivityManager extends AbstractActivityManager statesToRemove = new HashSet<>(); for (Map.Entry entry : states.entrySet()) { var sessionId = entry.getKey(); var activityStateWrapper = entry.getValue(); @@ -139,8 +136,8 @@ public class FirstOnlyTransportActivityManager extends AbstractActivityManager statesToRemove = new HashSet<>(); for (Map.Entry entry : states.entrySet()) { var sessionId = entry.getKey(); var activityState = entry.getValue(); @@ -91,8 +88,8 @@ public class LastOnlyTransportActivityManager extends AbstractActivityManager