Ability to limit alarm queries invocation count

This commit is contained in:
Andrii Shvaika 2021-12-14 18:13:26 +02:00
parent 4bc2eba8a2
commit 7de1343df0
5 changed files with 42 additions and 10 deletions

View File

@ -36,16 +36,13 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.AlarmDataQuery; import org.thingsboard.server.common.data.query.AlarmDataQuery;
import org.thingsboard.server.common.data.query.EntityData; import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityDataPageLink;
import org.thingsboard.server.common.data.query.EntityDataQuery; import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.data.query.EntityDataSortOrder;
import org.thingsboard.server.common.data.query.EntityKey; import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.TsValue; import org.thingsboard.server.common.data.query.TsValue;
import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
@ -68,7 +65,6 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
@ -83,8 +79,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Slf4j @Slf4j
@ -133,6 +127,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
private int maxEntitiesPerDataSubscription; private int maxEntitiesPerDataSubscription;
@Value("${server.ws.max_entities_per_alarm_subscription:1000}") @Value("${server.ws.max_entities_per_alarm_subscription:1000}")
private int maxEntitiesPerAlarmSubscription; private int maxEntitiesPerAlarmSubscription;
@Value("${server.ws.max_alarm_queries_per_refresh_interval:3}")
private int maxAlarmQueriesPerRefreshInterval;
private ExecutorService wsCallBackExecutor; private ExecutorService wsCallBackExecutor;
private boolean tsInSqlDB; private boolean tsInSqlDB;
@ -282,7 +278,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
if (adq.getPageLink().getTimeWindow() > 0) { if (adq.getPageLink().getTimeWindow() > 0) {
TbAlarmDataSubCtx finalCtx = ctx; TbAlarmDataSubCtx finalCtx = ctx;
ScheduledFuture<?> task = scheduler.scheduleWithFixedDelay( ScheduledFuture<?> task = scheduler.scheduleWithFixedDelay(
finalCtx::cleanupOldAlarms, dynamicPageLinkRefreshInterval, dynamicPageLinkRefreshInterval, TimeUnit.SECONDS); finalCtx::checkAndResetInvocationCounter, dynamicPageLinkRefreshInterval, dynamicPageLinkRefreshInterval, TimeUnit.SECONDS);
finalCtx.setRefreshTask(task); finalCtx.setRefreshTask(task);
} }
} }
@ -345,7 +341,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
private TbAlarmDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, AlarmDataCmd cmd) { private TbAlarmDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, AlarmDataCmd cmd) {
Map<Integer, TbAbstractSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>()); Map<Integer, TbAbstractSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>());
TbAlarmDataSubCtx ctx = new TbAlarmDataSubCtx(serviceId, wsService, entityService, localSubscriptionService, TbAlarmDataSubCtx ctx = new TbAlarmDataSubCtx(serviceId, wsService, entityService, localSubscriptionService,
attributesService, stats, alarmService, sessionRef, cmd.getCmdId(), maxEntitiesPerAlarmSubscription); attributesService, stats, alarmService, sessionRef, cmd.getCmdId(), maxEntitiesPerAlarmSubscription,
maxAlarmQueriesPerRefreshInterval);
ctx.setAndResolveQuery(cmd.getQuery()); ctx.setAndResolveQuery(cmd.getQuery());
sessionSubs.put(cmd.getCmdId(), ctx); sessionSubs.put(cmd.getCmdId(), ctx);
return ctx; return ctx;

View File

@ -57,6 +57,7 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
this.subToEntityIdMap = new ConcurrentHashMap<>(); this.subToEntityIdMap = new ConcurrentHashMap<>();
} }
@Override
public void fetchData() { public void fetchData() {
this.data = findEntityData(); this.data = findEntityData();
} }
@ -71,6 +72,7 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
return result; return result;
} }
@Override
protected synchronized void update() { protected synchronized void update() {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
PageData<EntityData> newData = findEntityData(); PageData<EntityData> newData = findEntityData();

View File

@ -68,6 +68,8 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
private final int maxEntitiesPerAlarmSubscription; private final int maxEntitiesPerAlarmSubscription;
private final int maxAlarmQueriesPerRefreshInterval;
@Getter @Getter
@Setter @Setter
private PageData<AlarmData> alarms; private PageData<AlarmData> alarms;
@ -75,18 +77,30 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
@Setter @Setter
private boolean tooManyEntities; private boolean tooManyEntities;
private int alarmInvocationAttempts;
public TbAlarmDataSubCtx(String serviceId, TelemetryWebSocketService wsService, public TbAlarmDataSubCtx(String serviceId, TelemetryWebSocketService wsService,
EntityService entityService, TbLocalSubscriptionService localSubscriptionService, EntityService entityService, TbLocalSubscriptionService localSubscriptionService,
AttributesService attributesService, SubscriptionServiceStatistics stats, AlarmService alarmService, AttributesService attributesService, SubscriptionServiceStatistics stats, AlarmService alarmService,
TelemetryWebSocketSessionRef sessionRef, int cmdId, int maxEntitiesPerAlarmSubscription) { TelemetryWebSocketSessionRef sessionRef, int cmdId,
int maxEntitiesPerAlarmSubscription, int maxAlarmQueriesPerRefreshInterval) {
super(serviceId, wsService, entityService, localSubscriptionService, attributesService, stats, sessionRef, cmdId); super(serviceId, wsService, entityService, localSubscriptionService, attributesService, stats, sessionRef, cmdId);
this.maxEntitiesPerAlarmSubscription = maxEntitiesPerAlarmSubscription; this.maxEntitiesPerAlarmSubscription = maxEntitiesPerAlarmSubscription;
this.maxAlarmQueriesPerRefreshInterval = maxAlarmQueriesPerRefreshInterval;
this.alarmService = alarmService; this.alarmService = alarmService;
this.entitiesMap = new LinkedHashMap<>(); this.entitiesMap = new LinkedHashMap<>();
this.alarmsMap = new HashMap<>(); this.alarmsMap = new HashMap<>();
} }
public void fetchAlarms() { public void fetchAlarms() {
alarmInvocationAttempts++;
log.trace("[{}] Fetching alarms: {}", cmdId, alarmInvocationAttempts);
if (alarmInvocationAttempts <= maxAlarmQueriesPerRefreshInterval) {
doFetchAlarms();
}
}
private void doFetchAlarms() {
AlarmDataUpdate update; AlarmDataUpdate update;
if (!entitiesMap.isEmpty()) { if (!entitiesMap.isEmpty()) {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
@ -103,6 +117,8 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
} }
public void fetchData() { public void fetchData() {
resetInvocationCounter();
log.trace("[{}] Fetching data: {}", cmdId, alarmInvocationAttempts);
super.fetchData(); super.fetchData();
entitiesMap.clear(); entitiesMap.clear();
tooManyEntities = data.hasNext(); tooManyEntities = data.hasNext();
@ -222,7 +238,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
} }
} }
if (shouldRefresh) { if (shouldRefresh) {
fetchAlarms(); doFetchAlarms();
} }
} }
@ -256,8 +272,19 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
return true; return true;
} }
public synchronized void checkAndResetInvocationCounter() {
boolean fetchNeeded = this.alarmInvocationAttempts > maxAlarmQueriesPerRefreshInterval;
resetInvocationCounter();
if (fetchNeeded) {
fetchAlarms();
} else {
cleanupOldAlarms();
}
}
@Override @Override
protected synchronized void doUpdate(Map<EntityId, EntityData> newDataMap) { protected synchronized void doUpdate(Map<EntityId, EntityData> newDataMap) {
resetInvocationCounter();
entitiesMap.clear(); entitiesMap.clear();
tooManyEntities = data.hasNext(); tooManyEntities = data.hasNext();
for (EntityData entityData : data.getData()) { for (EntityData entityData : data.getData()) {
@ -295,6 +322,10 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
subsToAdd.forEach(localSubscriptionService::addSubscription); subsToAdd.forEach(localSubscriptionService::addSubscription);
} }
private void resetInvocationCounter() {
alarmInvocationAttempts = 0;
}
@Override @Override
protected EntityDataQuery buildEntityDataQuery() { protected EntityDataQuery buildEntityDataQuery() {
EntityDataSortOrder sortOrder = query.getPageLink().getSortOrder(); EntityDataSortOrder sortOrder = query.getPageLink().getSortOrder();

View File

@ -165,6 +165,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
return data.getData().stream().filter(item -> item.getEntityId().equals(entityId)).findFirst().orElse(null); return data.getData().stream().filter(item -> item.getEntityId().equals(entityId)).findFirst().orElse(null);
} }
@Override
public synchronized void doUpdate(Map<EntityId, EntityData> newDataMap) { public synchronized void doUpdate(Map<EntityId, EntityData> newDataMap) {
List<Integer> subIdsToCancel = new ArrayList<>(); List<Integer> subIdsToCancel = new ArrayList<>();
List<TbSubscription> subsToAdd = new ArrayList<>(); List<TbSubscription> subsToAdd = new ArrayList<>();

View File

@ -70,6 +70,7 @@ server:
dynamic_page_link: dynamic_page_link:
refresh_interval: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_INTERVAL_SEC:60}" refresh_interval: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_INTERVAL_SEC:60}"
refresh_pool_size: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_POOL_SIZE:1}" refresh_pool_size: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_POOL_SIZE:1}"
max_alarm_queries_per_refresh_interval: "${TB_SERVER_WS_MAX_ALARM_QUERIES_PER_REFRESH_INTERVAL:3}"
max_per_user: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_MAX_PER_USER:10}" max_per_user: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_MAX_PER_USER:10}"
max_entities_per_data_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_DATA_SUBSCRIPTION:10000}" max_entities_per_data_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_DATA_SUBSCRIPTION:10000}"
max_entities_per_alarm_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_ALARM_SUBSCRIPTION:10000}" max_entities_per_alarm_subscription: "${TB_SERVER_WS_MAX_ENTITIES_PER_ALARM_SUBSCRIPTION:10000}"