From 84c22dc4a3050dd35662bb0146b39a18c8ac44de Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Fri, 8 Dec 2023 13:53:29 +0200 Subject: [PATCH] Add 'all' reporting strategy for integrations --- .../AllIntegrationActivityManager.java | 122 ++++++++++++++++++ .../FirstOnlyIntegrationActivityManager.java | 2 +- .../LastOnlyIntegrationActivityManager.java | 2 +- .../src/main/resources/thingsboard.yml | 3 +- 4 files changed, 126 insertions(+), 3 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/integration/activity/AllIntegrationActivityManager.java diff --git a/application/src/main/java/org/thingsboard/server/service/integration/activity/AllIntegrationActivityManager.java b/application/src/main/java/org/thingsboard/server/service/integration/activity/AllIntegrationActivityManager.java new file mode 100644 index 0000000000..de9b132896 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/integration/activity/AllIntegrationActivityManager.java @@ -0,0 +1,122 @@ +/** + * ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL + * + * Copyright © 2016-2023 ThingsBoard, Inc. All Rights Reserved. + * + * NOTICE: All information contained herein is, and remains + * the property of ThingsBoard, Inc. and its suppliers, + * if any. The intellectual and technical concepts contained + * herein are proprietary to ThingsBoard, Inc. + * and its suppliers and may be covered by U.S. and Foreign Patents, + * patents in process, and are protected by trade secret or copyright law. + * + * Dissemination of this information or reproduction of this material is strictly forbidden + * unless prior written permission is obtained from COMPANY. + * + * Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees, + * managers or contractors who have executed Confidentiality and Non-disclosure agreements + * explicitly covering such access. + * + * The copyright notice above does not evidence any actual or intended publication + * or disclosure of this source code, which includes + * information that is confidential and/or proprietary, and is a trade secret, of COMPANY. + * ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, + * OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT + * THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED, + * AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES. + * THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION + * DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, + * OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART. + */ +package org.thingsboard.server.service.integration.activity; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.util.Pair; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.transport.activity.AbstractActivityManager; +import org.thingsboard.server.common.transport.activity.ActivityReportCallback; +import org.thingsboard.server.common.transport.activity.ActivityState; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; + +@Slf4j +@Component +@ConditionalOnProperty(prefix = "integrations.activity", value = "reporting_strategy", havingValue = "all") +public class AllIntegrationActivityManager extends AbstractActivityManager { + + private final ConcurrentMap states = new ConcurrentHashMap<>(); + + @Override + protected void doOnActivity(IntegrationActivityKey activityKey, Supplier newStateSupplier) { + long newLastRecordedTime = System.currentTimeMillis(); + SettableFuture> reportCompletedFuture = SettableFuture.create(); + states.compute(activityKey, (key, activityState) -> { + if (activityState == null) { + activityState = newStateSupplier.get(); + } + if (activityState.getLastRecordedTime() < newLastRecordedTime) { + activityState.setLastRecordedTime(newLastRecordedTime); + } + if (activityState.getLastReportedTime() < activityState.getLastRecordedTime()) { + log.debug("[{}][{}] Going to report activity event for device with id: [{}].", + activityKey.getTenantId().getId(), name, activityKey.getDeviceId().getId()); + reporter.report(key, activityState.getLastRecordedTime(), activityState, new ActivityReportCallback<>() { + @Override + public void onSuccess(IntegrationActivityKey key, long reportedTime) { + reportCompletedFuture.set(Pair.of(key, reportedTime)); + } + + @Override + public void onFailure(IntegrationActivityKey key, Throwable t) { + reportCompletedFuture.setException(t); + } + }); + } + return activityState; + }); + Futures.addCallback(reportCompletedFuture, new FutureCallback<>() { + @Override + public void onSuccess(Pair reportResult) { + updateLastReportedTime(reportResult.getFirst(), reportResult.getSecond()); + } + + @Override + public void onFailure(@NonNull Throwable t) { + log.debug("[{}][{}] Failed to report activity event for device with id: [{}].", + name, activityKey.getTenantId().getId(), activityKey.getDeviceId().getId()); + } + }, MoreExecutors.directExecutor()); + } + + private void updateLastReportedTime(IntegrationActivityKey key, long newLastReportedTime) { + states.computeIfPresent(key, (__, activityState) -> { + activityState.setLastReportedTime(Math.max(activityState.getLastReportedTime(), newLastReportedTime)); + return activityState; + }); + } + + @Override + protected void doOnReportingPeriodEnd() { + for (Map.Entry entry : states.entrySet()) { + var activityKey = entry.getKey(); + var activityState = entry.getValue(); + // if there were no activities during the reporting period, we should remove the entry to prevent memory leaks + long expirationTime = System.currentTimeMillis() - reportingPeriodMillis; + if (activityState.getLastRecordedTime() < expirationTime) { + log.debug("[{}][{}] No activity events were received during reporting period for device with id: [{}]. Going to remove activity state.", + activityKey.getTenantId().getId(), name, activityKey.getDeviceId().getId()); + states.remove(activityKey); + } + } + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/integration/activity/FirstOnlyIntegrationActivityManager.java b/application/src/main/java/org/thingsboard/server/service/integration/activity/FirstOnlyIntegrationActivityManager.java index 35567847a5..239fd5f19e 100644 --- a/application/src/main/java/org/thingsboard/server/service/integration/activity/FirstOnlyIntegrationActivityManager.java +++ b/application/src/main/java/org/thingsboard/server/service/integration/activity/FirstOnlyIntegrationActivityManager.java @@ -106,7 +106,7 @@ public class FirstOnlyIntegrationActivityManager extends AbstractActivityManager @Override public void onFailure(@NonNull Throwable t) { - log.debug("[{}][{}] Failed to report activity event for device with id: [{}].", + log.debug("[{}][{}] Failed to report first activity event for device with id: [{}].", name, activityKey.getTenantId().getId(), activityKey.getDeviceId().getId()); } }, MoreExecutors.directExecutor()); diff --git a/application/src/main/java/org/thingsboard/server/service/integration/activity/LastOnlyIntegrationActivityManager.java b/application/src/main/java/org/thingsboard/server/service/integration/activity/LastOnlyIntegrationActivityManager.java index 6517a1af2a..f29b81baf8 100644 --- a/application/src/main/java/org/thingsboard/server/service/integration/activity/LastOnlyIntegrationActivityManager.java +++ b/application/src/main/java/org/thingsboard/server/service/integration/activity/LastOnlyIntegrationActivityManager.java @@ -65,12 +65,12 @@ public class LastOnlyIntegrationActivityManager extends AbstractActivityManager< @Override public void doOnReportingPeriodEnd() { - long expirationTime = System.currentTimeMillis() - reportingPeriodMillis; for (Map.Entry entry : states.entrySet()) { var activityKey = entry.getKey(); var activityState = entry.getValue(); long lastRecordedTime = activityState.getLastRecordedTime(); // if there were no activities during the reporting period, we should remove the entry to prevent memory leaks + long expirationTime = System.currentTimeMillis() - reportingPeriodMillis; if (lastRecordedTime < expirationTime) { log.debug("[{}][{}] No activity events were received during reporting period for device with id: [{}]. Going to remove activity state.", activityKey.getTenantId().getId(), name, activityKey.getDeviceId().getId()); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index e25d2ce3ba..7baf6dbf9b 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -874,7 +874,8 @@ transport: # - 'first': Only the first activity event in each reporting period is reported. # - 'last': Only the last activity event in the reporting period is reported. # - 'first-and-last': Both the first and last activity events in the reporting period are reported. - reporting_strategy: "${TB_TRANSPORT_ACTIVITY_REPORTING_STRATEGY:last}" + # - 'all': All activity events in the reporting period are reported. + reporting_strategy: "${TB_TRANSPORT_ACTIVITY_REPORTING_STRATEGY:all}" json: # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"