From 44f00eb011e9e2c07744fbd345ad7ed422e4e60d Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Wed, 24 Jun 2020 17:27:12 +0300 Subject: [PATCH] Improvements to WS API --- ...efaultTbEntityDataSubscriptionService.java | 257 +++++++----------- .../subscription/TbEntityDataSubCtx.java | 135 +++++++-- .../TbEntityDataSubscriptionService.java | 9 - .../DefaultTelemetryWebSocketService.java | 21 +- .../telemetry/cmd/v2/EntityHistoryCmd.java | 1 + .../src/main/resources/thingsboard.yml | 2 + .../common/data/kv/BaseReadTsKvQuery.java | 10 +- .../server/common/data/kv/ReadTsKvQuery.java | 2 +- .../common/data/query/EntityDataPageLink.java | 6 +- ...stractChunkedAggregationTimeseriesDao.java | 2 +- .../timescale/TimescaleTimeseriesDao.java | 2 +- .../dao/timeseries/BaseTimeseriesService.java | 2 +- .../CassandraBaseTimeseriesDao.java | 2 +- .../dao/timeseries/TsKvQueryCursor.java | 2 +- 14 files changed, 244 insertions(+), 209 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 2252211bd1..1772735f97 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -19,17 +19,16 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; -import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; -import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.id.CustomerId; -import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; @@ -40,19 +39,12 @@ import org.thingsboard.server.common.data.query.EntityDataQuery; import org.thingsboard.server.common.data.query.EntityKey; import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.common.data.query.TsValue; -import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.timeseries.TimeseriesService; -import org.thingsboard.server.queue.discovery.ClusterTopologyChangeEvent; -import org.thingsboard.server.queue.discovery.PartitionChangeEvent; -import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.util.TbCoreComponent; -import org.thingsboard.server.service.queue.TbClusterService; -import org.thingsboard.server.service.security.permission.Operation; +import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.telemetry.DefaultTelemetryWebSocketService; import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; @@ -63,12 +55,14 @@ import org.thingsboard.server.service.telemetry.cmd.v2.EntityHistoryCmd; import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd; import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd; import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode; -import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -79,6 +73,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @Slf4j @@ -87,28 +87,14 @@ import java.util.stream.Collectors; public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubscriptionService { private static final int DEFAULT_LIMIT = 100; - private final Set currentPartitions = ConcurrentHashMap.newKeySet(); private final Map> subscriptionsBySessionId = new ConcurrentHashMap<>(); @Autowired private TelemetryWebSocketService wsService; - @Autowired - private EntityViewService entityViewService; - @Autowired private EntityService entityService; - @Autowired - private PartitionService partitionService; - - @Autowired - private TbClusterService clusterService; - - @Autowired - @Lazy - private SubscriptionManagerService subscriptionManagerService; - @Autowired @Lazy private TbLocalSubscriptionService localSubscriptionService; @@ -119,18 +105,38 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @Autowired private TbServiceInfoProvider serviceInfoProvider; + @Autowired + @Getter + private DbCallbackExecutorService dbCallbackExecutor; + + private ScheduledExecutorService scheduler; + @Value("${database.ts.type}") private String databaseTsType; + @Value("${server.ws.dynamic_page_link_refresh_interval:6}") + private long dynamicPageLinkRefreshInterval; + @Value("${server.ws.dynamic_page_link_refresh_pool_size:1}") + private int dynamicPageLinkRefreshPoolSize; private ExecutorService wsCallBackExecutor; private boolean tsInSqlDB; private String serviceId; + private AtomicInteger regularQueryInvocationCnt = new AtomicInteger(); + private AtomicInteger dynamicQueryInvocationCnt = new AtomicInteger(); + private AtomicLong regularQueryTimeSpent = new AtomicLong(); + private AtomicLong dynamicQueryTimeSpent = new AtomicLong(); @PostConstruct public void initExecutor() { serviceId = serviceInfoProvider.getServiceId(); wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ws-entity-sub-callback")); tsInSqlDB = databaseTsType.equalsIgnoreCase("sql") || databaseTsType.equalsIgnoreCase("timescale"); + ThreadFactory tbThreadFactory = ThingsBoardThreadFactory.forName("ws-entity-sub-scheduler"); + if (dynamicPageLinkRefreshPoolSize == 1) { + scheduler = Executors.newSingleThreadScheduledExecutor(tbThreadFactory); + } else { + scheduler = Executors.newScheduledThreadPool(dynamicPageLinkRefreshPoolSize, tbThreadFactory); + } } @PreDestroy @@ -140,45 +146,19 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc } } - @Override - @EventListener(PartitionChangeEvent.class) - public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) { - if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) { - currentPartitions.clear(); - currentPartitions.addAll(partitionChangeEvent.getPartitions()); - } - } - - @Override - @EventListener(ClusterTopologyChangeEvent.class) - public void onApplicationEvent(ClusterTopologyChangeEvent event) { - if (event.getServiceQueueKeys().stream().anyMatch(key -> ServiceType.TB_CORE.equals(key.getServiceType()))) { - /* - * If the cluster topology has changed, we need to push all current subscriptions to SubscriptionManagerService again. - * Otherwise, the SubscriptionManagerService may "forget" those subscriptions in case of restart. - * Although this is resource consuming operation, it is cheaper than sending ping/pong commands periodically - * It is also cheaper then caching the subscriptions by entity id and then lookup of those caches every time we have new telemetry in SubscriptionManagerService. - * Even if we cache locally the list of active subscriptions by entity id, it is still time consuming operation to get them from cache - * Since number of subscriptions is usually much less then number of devices that are pushing data. -// */ -// subscriptionsBySessionId.values().forEach(map -> map.values() -// .forEach(sub -> pushSubscriptionToManagerService(sub, false))); - } - } - @Override public void handleCmd(TelemetryWebSocketSessionRef session, EntityDataCmd cmd) { TbEntityDataSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId()); if (ctx != null) { log.debug("[{}][{}] Updating existing subscriptions using: {}", session.getSessionId(), cmd.getCmdId(), cmd); if (cmd.getLatestCmd() != null || cmd.getTsCmd() != null || cmd.getHistoryCmd() != null) { - Collection oldSubIds = ctx.clearSubscriptions(); - oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId)); + clearSubs(ctx); } } else { log.debug("[{}][{}] Creating new subscription using: {}", session.getSessionId(), cmd.getCmdId(), cmd); ctx = createSubCtx(session, cmd); } + ctx.setCurrentCmd(cmd); if (cmd.getQuery() != null) { if (ctx.getQuery() == null) { log.debug("[{}][{}] Initializing data using query: {}", session.getSessionId(), cmd.getCmdId(), cmd.getQuery()); @@ -197,13 +177,26 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc } }); } + long start = System.currentTimeMillis(); PageData data = entityService.findEntityDataByQuery(tenantId, customerId, ctx.getQuery()); + long end = System.currentTimeMillis(); + regularQueryInvocationCnt.incrementAndGet(); + regularQueryTimeSpent.addAndGet(end - start); + if (log.isTraceEnabled()) { data.getData().forEach(ed -> { log.trace("[{}][{}] EntityData: {}", session.getSessionId(), cmd.getCmdId(), ed); }); } ctx.setData(data); + ctx.cancelRefreshTask(); + if (ctx.getQuery().getPageLink().isDynamic()) { + TbEntityDataSubCtx finalCtx = ctx; + ScheduledFuture task = scheduler.scheduleWithFixedDelay( + () -> refreshDynamicQuery(tenantId, customerId, finalCtx), + dynamicPageLinkRefreshInterval, dynamicPageLinkRefreshInterval, TimeUnit.SECONDS); + finalCtx.setRefreshTask(task); + } } ListenableFuture historyFuture; if (cmd.getHistoryCmd() != null) { @@ -233,6 +226,35 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc }, wsCallBackExecutor); } + private void refreshDynamicQuery(TenantId tenantId, CustomerId customerId, TbEntityDataSubCtx finalCtx) { + try { + long start = System.currentTimeMillis(); + Collection oldSubIds = finalCtx.update(entityService.findEntityDataByQuery(tenantId, customerId, finalCtx.getQuery())); + long end = System.currentTimeMillis(); + dynamicQueryInvocationCnt.incrementAndGet(); + dynamicQueryTimeSpent.addAndGet(end - start); + oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId)); + } catch (Exception e) { + log.warn("[{}][{}] Failed to refresh query", finalCtx.getSessionId(), finalCtx.getCmdId(), e); + } + } + + @Scheduled(fixedDelayString = "${server.ws.dynamic_page_link_stats:10000}") + public void printStats() { + int regularQueryInvocationCntValue = regularQueryInvocationCnt.getAndSet(0); + long regularQueryInvocationTimeValue = regularQueryTimeSpent.getAndSet(0); + int dynamicQueryInvocationCntValue = dynamicQueryInvocationCnt.getAndSet(0); + long dynamicQueryInvocationTimeValue = dynamicQueryTimeSpent.getAndSet(0); + long dynamicQueryCnt = subscriptionsBySessionId.values().stream().map(Map::values).count(); + log.info("Stats: regularQueryInvocationCnt = [{}], regularQueryInvocationTime = [{}], dynamicQueryCnt = [{}] dynamicQueryInvocationCnt = [{}], dynamicQueryInvocationTime = [{}]", + regularQueryInvocationCntValue, regularQueryInvocationTimeValue, dynamicQueryCnt, dynamicQueryInvocationCntValue, dynamicQueryInvocationTimeValue); + } + + private void clearSubs(TbEntityDataSubCtx ctx) { + Collection oldSubIds = ctx.clearSubscriptions(); + oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId)); + } + private TbEntityDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, EntityDataCmd cmd) { Map sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>()); TbEntityDataSubCtx ctx = new TbEntityDataSubCtx(serviceId, wsService, sessionRef, cmd.getCmdId()); @@ -282,6 +304,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc } wsService.sendWsMsg(ctx.getSessionId(), update); createSubscriptions(ctx, keys.stream().map(key -> new EntityKey(EntityKeyType.TIME_SERIES, key)).collect(Collectors.toList()), false); + ctx.getData().getData().forEach(ed -> ed.getTimeseries().clear()); } @Override @@ -378,12 +401,21 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc } private ListenableFuture handleHistoryCmd(TbEntityDataSubCtx ctx, EntityHistoryCmd historyCmd) { + List finalTsKvQueryList; List tsKvQueryList = historyCmd.getKeys().stream().map(key -> new BaseReadTsKvQuery( key, historyCmd.getStartTs(), historyCmd.getEndTs(), historyCmd.getInterval(), getLimit(historyCmd.getLimit()), historyCmd.getAgg() )).collect(Collectors.toList()); + if (historyCmd.isFetchLatestPreviousPoint()) { + finalTsKvQueryList = new ArrayList<>(tsKvQueryList); + tsKvQueryList.addAll(historyCmd.getKeys().stream().map(key -> new BaseReadTsKvQuery( + key, historyCmd.getStartTs() - TimeUnit.DAYS.toMillis(365), historyCmd.getStartTs(), historyCmd.getInterval(), 1, historyCmd.getAgg() + )).collect(Collectors.toList())); + } else { + finalTsKvQueryList = tsKvQueryList; + } Map>> fetchResultMap = new HashMap<>(); ctx.getData().getData().forEach(entityData -> fetchResultMap.put(entityData, - tsService.findAll(ctx.getTenantId(), entityData.getEntityId(), tsKvQueryList))); + tsService.findAll(ctx.getTenantId(), entityData.getEntityId(), finalTsKvQueryList))); return Futures.transform(Futures.allAsList(fetchResultMap.values()), f -> { fetchResultMap.forEach((entityData, future) -> { Map> keyData = new LinkedHashMap<>(); @@ -394,6 +426,11 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc entityTsData.forEach(entry -> keyData.get(entry.getKey()).add(new TsValue(entry.getTs(), entry.getValueAsString()))); } keyData.forEach((k, v) -> entityData.getTimeseries().put(k, v.toArray(new TsValue[v.size()]))); + if (historyCmd.isFetchLatestPreviousPoint()) { + entityData.getTimeseries().values().forEach(dataArray -> { + Arrays.sort(dataArray, (o1, o2) -> Long.compare(o2.getTs(), o1.getTs())); + }); + } } catch (InterruptedException | ExecutionException e) { log.warn("[{}][{}][{}] Failed to fetch historical data", ctx.getSessionId(), ctx.getCmdId(), entityData.getEntityId(), e); wsService.sendWsMsg(ctx.getSessionId(), @@ -408,119 +445,29 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData()); } wsService.sendWsMsg(ctx.getSessionId(), update); + ctx.getData().getData().forEach(ed -> ed.getTimeseries().clear()); return ctx; }, wsCallBackExecutor); } @Override public void cancelSubscription(String sessionId, EntityDataUnsubscribeCmd cmd) { - TbEntityDataSubCtx ctx = getSubCtx(sessionId, cmd.getCmdId()); + cleanupAndCancel(getSubCtx(sessionId, cmd.getCmdId())); } -// //TODO 3.1: replace null callbacks with callbacks from websocket service. -// @Override -// public void addSubscription(TbSubscription subscription) { -// EntityId entityId = subscription.getEntityId(); -// // Telemetry subscription on Entity Views are handled differently, because we need to allow only certain keys and time ranges; -// if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW) && TbSubscriptionType.TIMESERIES.equals(subscription.getType())) { -// subscription = resolveEntityViewSubscription((TbTimeseriesSubscription) subscription); -// } -// pushSubscriptionToManagerService(subscription, true); -// registerSubscription(subscription); -// } - -// private void pushSubscriptionToManagerService(TbSubscription subscription, boolean pushToLocalService) { -// TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId()); -// if (currentPartitions.contains(tpi)) { -// // Subscription is managed on the same server; -// if (pushToLocalService) { -// subscriptionManagerService.addSubscription(subscription, TbCallback.EMPTY); -// } -// } else { -// // Push to the queue; -// TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toNewSubscriptionProto(subscription); -// clusterService.pushMsgToCore(tpi, subscription.getEntityId().getId(), toCoreMsg, null); -// } -// } - - @Override - public void onSubscriptionUpdate(String sessionId, SubscriptionUpdate update, TbCallback callback) { -// TbSubscription subscription = subscriptionsBySessionId -// .getOrDefault(sessionId, Collections.emptyMap()).get(update.getSubscriptionId()); -// if (subscription != null) { -// switch (subscription.getType()) { -// case TIMESERIES: -// TbTimeseriesSubscription tsSub = (TbTimeseriesSubscription) subscription; -// update.getLatestValues().forEach((key, value) -> tsSub.getKeyStates().put(key, value)); -// break; -// case ATTRIBUTES: -// TbAttributeSubscription attrSub = (TbAttributeSubscription) subscription; -// update.getLatestValues().forEach((key, value) -> attrSub.getKeyStates().put(key, value)); -// break; -// } -// wsService.sendWsMsg(sessionId, update); -// } -// callback.onSuccess(); + private void cleanupAndCancel(TbEntityDataSubCtx ctx) { + if (ctx != null) { + ctx.cancelRefreshTask(); + clearSubs(ctx); + } } -// @Override -// public void cancelSubscription(String sessionId, int subscriptionId) { -// log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId); -// Map sessionSubscriptions = subscriptionsBySessionId.get(sessionId); -// if (sessionSubscriptions != null) { -// TbSubscription subscription = sessionSubscriptions.remove(subscriptionId); -// if (subscription != null) { -// if (sessionSubscriptions.isEmpty()) { -// subscriptionsBySessionId.remove(sessionId); -// } -// TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId()); -// if (currentPartitions.contains(tpi)) { -// // Subscription is managed on the same server; -// subscriptionManagerService.cancelSubscription(sessionId, subscriptionId, TbCallback.EMPTY); -// } else { -// // Push to the queue; -// TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toCloseSubscriptionProto(subscription); -// clusterService.pushMsgToCore(tpi, subscription.getEntityId().getId(), toCoreMsg, null); -// } -// } else { -// log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId); -// } -// } else { -// log.debug("[{}] No session subscriptions found!", sessionId); -// } -// } - @Override public void cancelAllSessionSubscriptions(String sessionId) { -// Map subscriptions = subscriptionsBySessionId.get(sessionId); -// if (subscriptions != null) { -// Set toRemove = new HashSet<>(subscriptions.keySet()); -// toRemove.forEach(id -> cancelSubscription(sessionId, id)); -// } - } - - private TbSubscription resolveEntityViewSubscription(TbTimeseriesSubscription subscription) { - EntityView entityView = entityViewService.findEntityViewById(TenantId.SYS_TENANT_ID, new EntityViewId(subscription.getEntityId().getId())); - - Map keyStates; - if (subscription.isAllKeys()) { - keyStates = entityView.getKeys().getTimeseries().stream().collect(Collectors.toMap(k -> k, k -> 0L)); - } else { - keyStates = subscription.getKeyStates().entrySet() - .stream().filter(entry -> entityView.getKeys().getTimeseries().contains(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map sessionSubs = subscriptionsBySessionId.remove(sessionId); + if (sessionSubs != null) { + sessionSubs.values().forEach(this::cleanupAndCancel); } - - return TbTimeseriesSubscription.builder() - .serviceId(subscription.getServiceId()) - .sessionId(subscription.getSessionId()) - .subscriptionId(subscription.getSubscriptionId()) - .tenantId(subscription.getTenantId()) - .entityId(entityView.getEntityId()) - .startTime(entityView.getStartTimeMs()) - .endTime(entityView.getEndTimeMs()) - .allKeys(false) - .keyStates(keyStates).build(); } private int getLimit(int limit) { diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java index 607c91afeb..10b766498f 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java @@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.common.data.query.TsValue; import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; +import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd; import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate; import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd; import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd; @@ -39,9 +40,14 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; +import java.util.function.Function; +import java.util.stream.Collectors; @Slf4j @Data @@ -53,12 +59,14 @@ public class TbEntityDataSubCtx { private final TelemetryWebSocketSessionRef sessionRef; private final int cmdId; private EntityDataQuery query; - private LatestValueCmd latestCmd; private TimeSeriesCmd tsCmd; private PageData data; private boolean initialDataSent; private List tbSubs; private Map subToEntityIdMap; + private volatile ScheduledFuture refreshTask; + private TimeSeriesCmd curTsCmd; + private LatestValueCmd latestValueCmd; public TbEntityDataSubCtx(String serviceId, TelemetryWebSocketService wsService, TelemetryWebSocketSessionRef sessionRef, int cmdId) { this.serviceId = serviceId; @@ -86,34 +94,43 @@ public class TbEntityDataSubCtx { public List createSubscriptions(List keys, boolean resultToLatestValues) { this.subToEntityIdMap = new HashMap<>(); tbSubs = new ArrayList<>(); - Map> keysByType = new HashMap<>(); - keys.forEach(key -> keysByType.computeIfAbsent(key.getType(), k -> new ArrayList<>()).add(key)); + Map> keysByType = getEntityKeyByTypeMap(keys); for (EntityData entityData : data.getData()) { - keysByType.forEach((keysType, keysList) -> { - int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet(); - subToEntityIdMap.put(subIdx, entityData.getEntityId()); - switch (keysType) { - case TIME_SERIES: - tbSubs.add(createTsSub(entityData, subIdx, keysList, resultToLatestValues)); - break; - case CLIENT_ATTRIBUTE: - tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.CLIENT_SCOPE, keysList)); - break; - case SHARED_ATTRIBUTE: - tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SHARED_SCOPE, keysList)); - break; - case SERVER_ATTRIBUTE: - tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SERVER_SCOPE, keysList)); - break; - case ATTRIBUTE: - tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.ANY_SCOPE, keysList)); - break; - } - }); + addSubscription(entityData, keysByType, resultToLatestValues); } return tbSubs; } + private Map> getEntityKeyByTypeMap(List keys) { + Map> keysByType = new HashMap<>(); + keys.forEach(key -> keysByType.computeIfAbsent(key.getType(), k -> new ArrayList<>()).add(key)); + return keysByType; + } + + private void addSubscription(EntityData entityData, Map> keysByType, boolean resultToLatestValues) { + keysByType.forEach((keysType, keysList) -> { + int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet(); + subToEntityIdMap.put(subIdx, entityData.getEntityId()); + switch (keysType) { + case TIME_SERIES: + tbSubs.add(createTsSub(entityData, subIdx, keysList, resultToLatestValues)); + break; + case CLIENT_ATTRIBUTE: + tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.CLIENT_SCOPE, keysList)); + break; + case SHARED_ATTRIBUTE: + tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SHARED_SCOPE, keysList)); + break; + case SERVER_ATTRIBUTE: + tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SERVER_SCOPE, keysList)); + break; + case ATTRIBUTE: + tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.ANY_SCOPE, keysList)); + break; + } + }); + } + private TbSubscription createAttrSub(EntityData entityData, int subIdx, EntityKeyType keysType, TbAttributeSubscriptionScope scope, List subKeys) { Map keyStates = buildKeyStats(entityData, keysType, subKeys); log.trace("[{}][{}][{}] Creating attributes subscription with keys: {}", serviceId, cmdId, subIdx, keyStates); @@ -275,4 +292,74 @@ public class TbEntityDataSubCtx { return Collections.emptyList(); } } + + public void setRefreshTask(ScheduledFuture task) { + this.refreshTask = task; + } + + public void cancelRefreshTask() { + if (this.refreshTask != null) { + log.trace("[{}][{}] Canceling old refresh task", sessionRef.getSessionId(), cmdId); + this.refreshTask.cancel(true); + } + } + + public Collection update(PageData newData) { + Map oldDataMap; + if (data != null && !data.getData().isEmpty()) { + oldDataMap = data.getData().stream().collect(Collectors.toMap(EntityData::getEntityId, Function.identity())); + } else { + oldDataMap = Collections.emptyMap(); + } + Map newDataMap = newData.getData().stream().collect(Collectors.toMap(EntityData::getEntityId, Function.identity())); + if (oldDataMap.size() == newDataMap.size() && oldDataMap.keySet().equals(newDataMap.keySet())) { + log.trace("[{}][{}] No updates to entity data found", sessionRef.getSessionId(), cmdId); + return Collections.emptyList(); + } else { + this.data = newData; + List subIdsToRemove = new ArrayList<>(); + Set currentSubs = new HashSet<>(); + subToEntityIdMap.forEach((subId, entityId) -> { + if (!newDataMap.containsKey(entityId)) { + subIdsToRemove.add(subId); + } else { + currentSubs.add(entityId); + } + }); + log.trace("[{}][{}] Subscriptions that are invalid: {}", sessionRef.getSessionId(), cmdId, subIdsToRemove); + subIdsToRemove.forEach(subToEntityIdMap::remove); + List newSubsList = newDataMap.entrySet().stream().filter(entry -> !currentSubs.contains(entry.getKey())).map(Map.Entry::getValue).collect(Collectors.toList()); + if (!newSubsList.isEmpty()) { + boolean resultToLatestValues; + List keys = null; + if (curTsCmd != null) { + resultToLatestValues = false; + keys = curTsCmd.getKeys().stream().map(key -> new EntityKey(EntityKeyType.TIME_SERIES, key)).collect(Collectors.toList()); + } else if (latestValueCmd != null) { + resultToLatestValues = true; + keys = latestValueCmd.getKeys(); + } else { + resultToLatestValues = true; + } + if (keys != null && !keys.isEmpty()) { + Map> keysByType = getEntityKeyByTypeMap(keys); + newSubsList.forEach( + entity -> { + log.trace("[{}][{}] Found new subscription for entity: {}", sessionRef.getSessionId(), cmdId, entity.getEntityId()); + if (curTsCmd != null) { + addSubscription(entity, keysByType, resultToLatestValues); + } + } + ); + } + } + wsService.sendWsMsg(sessionRef.getSessionId(), new EntityDataUpdate(cmdId, data, null)); + return subIdsToRemove; + } + } + + public void setCurrentCmd(EntityDataCmd cmd) { + curTsCmd = cmd.getTsCmd(); + latestValueCmd = cmd.getLatestCmd(); + } } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubscriptionService.java index f0b5fb85b5..af561c8b5b 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubscriptionService.java @@ -15,13 +15,9 @@ */ package org.thingsboard.server.service.subscription; -import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.queue.discovery.ClusterTopologyChangeEvent; -import org.thingsboard.server.queue.discovery.PartitionChangeEvent; import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd; import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUnsubscribeCmd; -import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate; public interface TbEntityDataSubscriptionService { @@ -31,9 +27,4 @@ public interface TbEntityDataSubscriptionService { void cancelAllSessionSubscriptions(String sessionId); - void onSubscriptionUpdate(String sessionId, SubscriptionUpdate update, TbCallback callback); - - void onApplicationEvent(PartitionChangeEvent event); - - void onApplicationEvent(ClusterTopologyChangeEvent event); } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java index 678a191384..07bffedcb7 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java @@ -697,15 +697,18 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi } private void sendWsMsg(TelemetryWebSocketSessionRef sessionRef, int cmdId, Object update) { - executor.submit(() -> { - try { - msgEndpoint.send(sessionRef, cmdId, jsonMapper.writeValueAsString(update)); - } catch (JsonProcessingException e) { - log.warn("[{}] Failed to encode reply: {}", sessionRef.getSessionId(), update, e); - } catch (IOException e) { - log.warn("[{}] Failed to send reply: {}", sessionRef.getSessionId(), update, e); - } - }); + try { + String msg = jsonMapper.writeValueAsString(update); + executor.submit(() -> { + try { + msgEndpoint.send(sessionRef, cmdId, msg); + } catch (IOException e) { + log.warn("[{}] Failed to send reply: {}", sessionRef.getSessionId(), update, e); + } + }); + } catch (JsonProcessingException e) { + log.warn("[{}] Failed to encode reply: {}", sessionRef.getSessionId(), update, e); + } } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/EntityHistoryCmd.java b/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/EntityHistoryCmd.java index af7f6f4d34..8e3aafac25 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/EntityHistoryCmd.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/EntityHistoryCmd.java @@ -29,5 +29,6 @@ public class EntityHistoryCmd { private long interval; private int limit; private Aggregation agg; + private boolean fetchLatestPreviousPoint; } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 90b84dfbad..b9070a8931 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -46,6 +46,8 @@ server: max_subscriptions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_REGULAR_USER:0}" max_subscriptions_per_public_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_PUBLIC_USER:0}" max_updates_per_session: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_UPDATES_PER_SESSION:300:1,3000:60}" + dynamic_page_link_refresh_interval: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_INTERVAL_SEC:6}" + dynamic_page_link_refresh_pool_size: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_POOL_SIZE:1}" rest: limits: tenant: diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseReadTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseReadTsKvQuery.java index a2433440a7..9e40bc11e2 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseReadTsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseReadTsKvQuery.java @@ -23,27 +23,27 @@ public class BaseReadTsKvQuery extends BaseTsKvQuery implements ReadTsKvQuery { private final long interval; private final int limit; private final Aggregation aggregation; - private final String orderBy; + private final String order; public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation) { this(key, startTs, endTs, interval, limit, aggregation, "DESC"); } public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, - String orderBy) { + String order) { super(key, startTs, endTs); this.interval = interval; this.limit = limit; this.aggregation = aggregation; - this.orderBy = orderBy; + this.order = order; } public BaseReadTsKvQuery(String key, long startTs, long endTs) { this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC"); } - public BaseReadTsKvQuery(String key, long startTs, long endTs, int limit, String orderBy) { - this(key, startTs, endTs, endTs - startTs, limit, Aggregation.NONE, orderBy); + public BaseReadTsKvQuery(String key, long startTs, long endTs, int limit, String order) { + this(key, startTs, endTs, endTs - startTs, limit, Aggregation.NONE, order); } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQuery.java index 5b73693af3..9c5e6541bc 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQuery.java @@ -23,6 +23,6 @@ public interface ReadTsKvQuery extends TsKvQuery { Aggregation getAggregation(); - String getOrderBy(); + String getOrder(); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityDataPageLink.java b/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityDataPageLink.java index df37c70f35..e13fe65158 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityDataPageLink.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityDataPageLink.java @@ -27,13 +27,17 @@ public class EntityDataPageLink { private int page; private String textSearch; private EntityDataSortOrder sortOrder; + private boolean dynamic = false; public EntityDataPageLink() { } + public EntityDataPageLink(int pageSize, int page, String textSearch, EntityDataSortOrder sortOrder) { + this(pageSize, page, textSearch, sortOrder, false); + } + @JsonIgnore public EntityDataPageLink nextPageLink() { return new EntityDataPageLink(this.pageSize, this.page+1, this.textSearch, this.sortOrder); } - } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java index c663d4346a..501eee125a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractChunkedAggregationTimeseriesDao.java @@ -152,7 +152,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq query.getEndTs(), PageRequest.of(0, query.getLimit(), Sort.by(Sort.Direction.fromString( - query.getOrderBy()), "ts"))); + query.getOrder()), "ts"))); tsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(query.getKey())); return Futures.immediateFuture(DaoUtil.convertDataList(tsKvEntities)); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index d9121f684e..5649a7e2df 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -110,7 +110,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements query.getEndTs(), PageRequest.of(0, query.getLimit(), Sort.by(Sort.Direction.fromString( - query.getOrderBy()), "ts"))); + query.getOrder()), "ts"))); timescaleTsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(strKey)); return Futures.immediateFuture(DaoUtil.convertDataList(timescaleTsKvEntities)); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 3486369999..376ea6499e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -170,7 +170,7 @@ public class BaseTimeseriesService implements TimeseriesService { } else { endTs = query.getEndTs(); } - return new BaseReadTsKvQuery(query.getKey(), startTs, endTs, query.getInterval(), query.getLimit(), query.getAggregation(), query.getOrderBy()); + return new BaseReadTsKvQuery(query.getKey(), startTs, endTs, query.getInterval(), query.getLimit(), query.getAggregation(), query.getOrder()); }).collect(Collectors.toList()); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 9105e77956..3eeff89a90 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -168,7 +168,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem while (stepTs < query.getEndTs()) { long startTs = stepTs; long endTs = stepTs + step; - ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy()); + ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrder()); futures.add(findAndAggregateAsync(tenantId, entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs))); stepTs = endTs; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java index d332cddc26..ccddddb5e1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java @@ -40,7 +40,7 @@ public class TsKvQueryCursor extends QueryCursor { public TsKvQueryCursor(String entityType, UUID entityId, ReadTsKvQuery baseQuery, List partitions) { super(entityType, entityId, baseQuery, partitions); - this.orderBy = baseQuery.getOrderBy(); + this.orderBy = baseQuery.getOrder(); this.partitionIndex = isDesc() ? partitions.size() - 1 : 0; this.data = new ArrayList<>(); this.currentLimit = baseQuery.getLimit();