diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 127de752d0..4ee8708805 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -36,16 +36,13 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.query.AlarmDataQuery; import org.thingsboard.server.common.data.query.EntityData; -import org.thingsboard.server.common.data.query.EntityDataPageLink; import org.thingsboard.server.common.data.query.EntityDataQuery; -import org.thingsboard.server.common.data.query.EntityDataSortOrder; import org.thingsboard.server.common.data.query.EntityKey; import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.common.data.query.TsValue; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.entity.EntityService; -import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.util.TbCoreComponent; @@ -68,7 +65,6 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -83,8 +79,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @Slf4j @@ -133,6 +127,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc private int maxEntitiesPerDataSubscription; @Value("${server.ws.max_entities_per_alarm_subscription:1000}") private int maxEntitiesPerAlarmSubscription; + @Value("${server.ws.max_alarm_queries_per_refresh_interval:3}") + private int maxAlarmQueriesPerRefreshInterval; private ExecutorService wsCallBackExecutor; private boolean tsInSqlDB; @@ -282,7 +278,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc if (adq.getPageLink().getTimeWindow() > 0) { TbAlarmDataSubCtx finalCtx = ctx; ScheduledFuture task = scheduler.scheduleWithFixedDelay( - finalCtx::cleanupOldAlarms, dynamicPageLinkRefreshInterval, dynamicPageLinkRefreshInterval, TimeUnit.SECONDS); + finalCtx::checkAndResetInvocationCounter, dynamicPageLinkRefreshInterval, dynamicPageLinkRefreshInterval, TimeUnit.SECONDS); finalCtx.setRefreshTask(task); } } @@ -345,7 +341,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc private TbAlarmDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, AlarmDataCmd cmd) { Map sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>()); TbAlarmDataSubCtx ctx = new TbAlarmDataSubCtx(serviceId, wsService, entityService, localSubscriptionService, - attributesService, stats, alarmService, sessionRef, cmd.getCmdId(), maxEntitiesPerAlarmSubscription); + attributesService, stats, alarmService, sessionRef, cmd.getCmdId(), maxEntitiesPerAlarmSubscription, + maxAlarmQueriesPerRefreshInterval); ctx.setAndResolveQuery(cmd.getQuery()); sessionSubs.put(cmd.getCmdId(), ctx); return ctx; diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java index e054c9436f..1d5f8eb603 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java @@ -57,6 +57,7 @@ public abstract class TbAbstractDataSubCtx(); } + @Override public void fetchData() { this.data = findEntityData(); } @@ -71,6 +72,7 @@ public abstract class TbAbstractDataSubCtx newData = findEntityData(); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java index c3343effb5..1ea6495517 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java @@ -68,6 +68,8 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { private final int maxEntitiesPerAlarmSubscription; + private final int maxAlarmQueriesPerRefreshInterval; + @Getter @Setter private PageData alarms; @@ -75,18 +77,30 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { @Setter private boolean tooManyEntities; + private int alarmInvocationAttempts; + public TbAlarmDataSubCtx(String serviceId, TelemetryWebSocketService wsService, EntityService entityService, TbLocalSubscriptionService localSubscriptionService, AttributesService attributesService, SubscriptionServiceStatistics stats, AlarmService alarmService, - TelemetryWebSocketSessionRef sessionRef, int cmdId, int maxEntitiesPerAlarmSubscription) { + TelemetryWebSocketSessionRef sessionRef, int cmdId, + int maxEntitiesPerAlarmSubscription, int maxAlarmQueriesPerRefreshInterval) { super(serviceId, wsService, entityService, localSubscriptionService, attributesService, stats, sessionRef, cmdId); this.maxEntitiesPerAlarmSubscription = maxEntitiesPerAlarmSubscription; + this.maxAlarmQueriesPerRefreshInterval = maxAlarmQueriesPerRefreshInterval; this.alarmService = alarmService; this.entitiesMap = new LinkedHashMap<>(); this.alarmsMap = new HashMap<>(); } public void fetchAlarms() { + alarmInvocationAttempts++; + log.trace("[{}] Fetching alarms: {}", cmdId, alarmInvocationAttempts); + if (alarmInvocationAttempts <= maxAlarmQueriesPerRefreshInterval) { + doFetchAlarms(); + } + } + + private void doFetchAlarms() { AlarmDataUpdate update; if (!entitiesMap.isEmpty()) { long start = System.currentTimeMillis(); @@ -103,6 +117,8 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { } public void fetchData() { + resetInvocationCounter(); + log.trace("[{}] Fetching data: {}", cmdId, alarmInvocationAttempts); super.fetchData(); entitiesMap.clear(); tooManyEntities = data.hasNext(); @@ -222,7 +238,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { } } if (shouldRefresh) { - fetchAlarms(); + doFetchAlarms(); } } @@ -256,8 +272,19 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { return true; } + public synchronized void checkAndResetInvocationCounter() { + boolean fetchNeeded = this.alarmInvocationAttempts > maxAlarmQueriesPerRefreshInterval; + resetInvocationCounter(); + if (fetchNeeded) { + fetchAlarms(); + } else { + cleanupOldAlarms(); + } + } + @Override protected synchronized void doUpdate(Map newDataMap) { + resetInvocationCounter(); entitiesMap.clear(); tooManyEntities = data.hasNext(); for (EntityData entityData : data.getData()) { @@ -295,6 +322,10 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { subsToAdd.forEach(localSubscriptionService::addSubscription); } + private void resetInvocationCounter() { + alarmInvocationAttempts = 0; + } + @Override protected EntityDataQuery buildEntityDataQuery() { EntityDataSortOrder sortOrder = query.getPageLink().getSortOrder(); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java index 7325fe7878..d31456ff6c 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java @@ -165,6 +165,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx { return data.getData().stream().filter(item -> item.getEntityId().equals(entityId)).findFirst().orElse(null); } + @Override public synchronized void doUpdate(Map newDataMap) { List subIdsToCancel = new ArrayList<>(); List subsToAdd = new ArrayList<>(); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 8f02a7cb02..e0f45be7dc 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -70,6 +70,7 @@ server: dynamic_page_link: refresh_interval: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_INTERVAL_SEC:60}" refresh_pool_size: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_POOL_SIZE:1}" + max_alarm_queries_per_refresh_interval: "${TB_SERVER_WS_MAX_ALARM_QUERIES_PER_REFRESH_INTERVAL:3}" max_per_user: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_MAX_PER_USER:10}" max_entities_per_data_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_DATA_SUBSCRIPTION:10000}" max_entities_per_alarm_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_ALARM_SUBSCRIPTION:10000}"