Improvements to WS API
This commit is contained in:
parent
f028e56656
commit
44f00eb011
@ -19,17 +19,16 @@ import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.EntityView;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.EntityViewId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
|
||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
||||
@ -40,19 +39,12 @@ import org.thingsboard.server.common.data.query.EntityDataQuery;
|
||||
import org.thingsboard.server.common.data.query.EntityKey;
|
||||
import org.thingsboard.server.common.data.query.EntityKeyType;
|
||||
import org.thingsboard.server.common.data.query.TsValue;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.dao.entity.EntityService;
|
||||
import org.thingsboard.server.dao.entityview.EntityViewService;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
import org.thingsboard.server.queue.discovery.ClusterTopologyChangeEvent;
|
||||
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.queue.TbClusterService;
|
||||
import org.thingsboard.server.service.security.permission.Operation;
|
||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
||||
import org.thingsboard.server.service.telemetry.DefaultTelemetryWebSocketService;
|
||||
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
|
||||
import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
|
||||
@ -63,12 +55,14 @@ import org.thingsboard.server.service.telemetry.cmd.v2.EntityHistoryCmd;
|
||||
import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd;
|
||||
import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd;
|
||||
import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
|
||||
import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
@ -79,6 +73,12 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@ -87,28 +87,14 @@ import java.util.stream.Collectors;
|
||||
public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubscriptionService {
|
||||
|
||||
private static final int DEFAULT_LIMIT = 100;
|
||||
private final Set<TopicPartitionInfo> currentPartitions = ConcurrentHashMap.newKeySet();
|
||||
private final Map<String, Map<Integer, TbEntityDataSubCtx>> subscriptionsBySessionId = new ConcurrentHashMap<>();
|
||||
|
||||
@Autowired
|
||||
private TelemetryWebSocketService wsService;
|
||||
|
||||
@Autowired
|
||||
private EntityViewService entityViewService;
|
||||
|
||||
@Autowired
|
||||
private EntityService entityService;
|
||||
|
||||
@Autowired
|
||||
private PartitionService partitionService;
|
||||
|
||||
@Autowired
|
||||
private TbClusterService clusterService;
|
||||
|
||||
@Autowired
|
||||
@Lazy
|
||||
private SubscriptionManagerService subscriptionManagerService;
|
||||
|
||||
@Autowired
|
||||
@Lazy
|
||||
private TbLocalSubscriptionService localSubscriptionService;
|
||||
@ -119,18 +105,38 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
|
||||
@Autowired
|
||||
private TbServiceInfoProvider serviceInfoProvider;
|
||||
|
||||
@Autowired
|
||||
@Getter
|
||||
private DbCallbackExecutorService dbCallbackExecutor;
|
||||
|
||||
private ScheduledExecutorService scheduler;
|
||||
|
||||
@Value("${database.ts.type}")
|
||||
private String databaseTsType;
|
||||
@Value("${server.ws.dynamic_page_link_refresh_interval:6}")
|
||||
private long dynamicPageLinkRefreshInterval;
|
||||
@Value("${server.ws.dynamic_page_link_refresh_pool_size:1}")
|
||||
private int dynamicPageLinkRefreshPoolSize;
|
||||
|
||||
private ExecutorService wsCallBackExecutor;
|
||||
private boolean tsInSqlDB;
|
||||
private String serviceId;
|
||||
private AtomicInteger regularQueryInvocationCnt = new AtomicInteger();
|
||||
private AtomicInteger dynamicQueryInvocationCnt = new AtomicInteger();
|
||||
private AtomicLong regularQueryTimeSpent = new AtomicLong();
|
||||
private AtomicLong dynamicQueryTimeSpent = new AtomicLong();
|
||||
|
||||
@PostConstruct
|
||||
public void initExecutor() {
|
||||
serviceId = serviceInfoProvider.getServiceId();
|
||||
wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ws-entity-sub-callback"));
|
||||
tsInSqlDB = databaseTsType.equalsIgnoreCase("sql") || databaseTsType.equalsIgnoreCase("timescale");
|
||||
ThreadFactory tbThreadFactory = ThingsBoardThreadFactory.forName("ws-entity-sub-scheduler");
|
||||
if (dynamicPageLinkRefreshPoolSize == 1) {
|
||||
scheduler = Executors.newSingleThreadScheduledExecutor(tbThreadFactory);
|
||||
} else {
|
||||
scheduler = Executors.newScheduledThreadPool(dynamicPageLinkRefreshPoolSize, tbThreadFactory);
|
||||
}
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
@ -140,45 +146,19 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@EventListener(PartitionChangeEvent.class)
|
||||
public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
|
||||
if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) {
|
||||
currentPartitions.clear();
|
||||
currentPartitions.addAll(partitionChangeEvent.getPartitions());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@EventListener(ClusterTopologyChangeEvent.class)
|
||||
public void onApplicationEvent(ClusterTopologyChangeEvent event) {
|
||||
if (event.getServiceQueueKeys().stream().anyMatch(key -> ServiceType.TB_CORE.equals(key.getServiceType()))) {
|
||||
/*
|
||||
* If the cluster topology has changed, we need to push all current subscriptions to SubscriptionManagerService again.
|
||||
* Otherwise, the SubscriptionManagerService may "forget" those subscriptions in case of restart.
|
||||
* Although this is resource consuming operation, it is cheaper than sending ping/pong commands periodically
|
||||
* It is also cheaper then caching the subscriptions by entity id and then lookup of those caches every time we have new telemetry in SubscriptionManagerService.
|
||||
* Even if we cache locally the list of active subscriptions by entity id, it is still time consuming operation to get them from cache
|
||||
* Since number of subscriptions is usually much less then number of devices that are pushing data.
|
||||
// */
|
||||
// subscriptionsBySessionId.values().forEach(map -> map.values()
|
||||
// .forEach(sub -> pushSubscriptionToManagerService(sub, false)));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleCmd(TelemetryWebSocketSessionRef session, EntityDataCmd cmd) {
|
||||
TbEntityDataSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId());
|
||||
if (ctx != null) {
|
||||
log.debug("[{}][{}] Updating existing subscriptions using: {}", session.getSessionId(), cmd.getCmdId(), cmd);
|
||||
if (cmd.getLatestCmd() != null || cmd.getTsCmd() != null || cmd.getHistoryCmd() != null) {
|
||||
Collection<Integer> oldSubIds = ctx.clearSubscriptions();
|
||||
oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId));
|
||||
clearSubs(ctx);
|
||||
}
|
||||
} else {
|
||||
log.debug("[{}][{}] Creating new subscription using: {}", session.getSessionId(), cmd.getCmdId(), cmd);
|
||||
ctx = createSubCtx(session, cmd);
|
||||
}
|
||||
ctx.setCurrentCmd(cmd);
|
||||
if (cmd.getQuery() != null) {
|
||||
if (ctx.getQuery() == null) {
|
||||
log.debug("[{}][{}] Initializing data using query: {}", session.getSessionId(), cmd.getCmdId(), cmd.getQuery());
|
||||
@ -197,13 +177,26 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
|
||||
}
|
||||
});
|
||||
}
|
||||
long start = System.currentTimeMillis();
|
||||
PageData<EntityData> data = entityService.findEntityDataByQuery(tenantId, customerId, ctx.getQuery());
|
||||
long end = System.currentTimeMillis();
|
||||
regularQueryInvocationCnt.incrementAndGet();
|
||||
regularQueryTimeSpent.addAndGet(end - start);
|
||||
|
||||
if (log.isTraceEnabled()) {
|
||||
data.getData().forEach(ed -> {
|
||||
log.trace("[{}][{}] EntityData: {}", session.getSessionId(), cmd.getCmdId(), ed);
|
||||
});
|
||||
}
|
||||
ctx.setData(data);
|
||||
ctx.cancelRefreshTask();
|
||||
if (ctx.getQuery().getPageLink().isDynamic()) {
|
||||
TbEntityDataSubCtx finalCtx = ctx;
|
||||
ScheduledFuture<?> task = scheduler.scheduleWithFixedDelay(
|
||||
() -> refreshDynamicQuery(tenantId, customerId, finalCtx),
|
||||
dynamicPageLinkRefreshInterval, dynamicPageLinkRefreshInterval, TimeUnit.SECONDS);
|
||||
finalCtx.setRefreshTask(task);
|
||||
}
|
||||
}
|
||||
ListenableFuture<TbEntityDataSubCtx> historyFuture;
|
||||
if (cmd.getHistoryCmd() != null) {
|
||||
@ -233,6 +226,35 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
|
||||
}, wsCallBackExecutor);
|
||||
}
|
||||
|
||||
private void refreshDynamicQuery(TenantId tenantId, CustomerId customerId, TbEntityDataSubCtx finalCtx) {
|
||||
try {
|
||||
long start = System.currentTimeMillis();
|
||||
Collection<Integer> oldSubIds = finalCtx.update(entityService.findEntityDataByQuery(tenantId, customerId, finalCtx.getQuery()));
|
||||
long end = System.currentTimeMillis();
|
||||
dynamicQueryInvocationCnt.incrementAndGet();
|
||||
dynamicQueryTimeSpent.addAndGet(end - start);
|
||||
oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId));
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}][{}] Failed to refresh query", finalCtx.getSessionId(), finalCtx.getCmdId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelayString = "${server.ws.dynamic_page_link_stats:10000}")
|
||||
public void printStats() {
|
||||
int regularQueryInvocationCntValue = regularQueryInvocationCnt.getAndSet(0);
|
||||
long regularQueryInvocationTimeValue = regularQueryTimeSpent.getAndSet(0);
|
||||
int dynamicQueryInvocationCntValue = dynamicQueryInvocationCnt.getAndSet(0);
|
||||
long dynamicQueryInvocationTimeValue = dynamicQueryTimeSpent.getAndSet(0);
|
||||
long dynamicQueryCnt = subscriptionsBySessionId.values().stream().map(Map::values).count();
|
||||
log.info("Stats: regularQueryInvocationCnt = [{}], regularQueryInvocationTime = [{}], dynamicQueryCnt = [{}] dynamicQueryInvocationCnt = [{}], dynamicQueryInvocationTime = [{}]",
|
||||
regularQueryInvocationCntValue, regularQueryInvocationTimeValue, dynamicQueryCnt, dynamicQueryInvocationCntValue, dynamicQueryInvocationTimeValue);
|
||||
}
|
||||
|
||||
private void clearSubs(TbEntityDataSubCtx ctx) {
|
||||
Collection<Integer> oldSubIds = ctx.clearSubscriptions();
|
||||
oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId));
|
||||
}
|
||||
|
||||
private TbEntityDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, EntityDataCmd cmd) {
|
||||
Map<Integer, TbEntityDataSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>());
|
||||
TbEntityDataSubCtx ctx = new TbEntityDataSubCtx(serviceId, wsService, sessionRef, cmd.getCmdId());
|
||||
@ -282,6 +304,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
|
||||
}
|
||||
wsService.sendWsMsg(ctx.getSessionId(), update);
|
||||
createSubscriptions(ctx, keys.stream().map(key -> new EntityKey(EntityKeyType.TIME_SERIES, key)).collect(Collectors.toList()), false);
|
||||
ctx.getData().getData().forEach(ed -> ed.getTimeseries().clear());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -378,12 +401,21 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
|
||||
}
|
||||
|
||||
private ListenableFuture<TbEntityDataSubCtx> handleHistoryCmd(TbEntityDataSubCtx ctx, EntityHistoryCmd historyCmd) {
|
||||
List<ReadTsKvQuery> finalTsKvQueryList;
|
||||
List<ReadTsKvQuery> tsKvQueryList = historyCmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(
|
||||
key, historyCmd.getStartTs(), historyCmd.getEndTs(), historyCmd.getInterval(), getLimit(historyCmd.getLimit()), historyCmd.getAgg()
|
||||
)).collect(Collectors.toList());
|
||||
if (historyCmd.isFetchLatestPreviousPoint()) {
|
||||
finalTsKvQueryList = new ArrayList<>(tsKvQueryList);
|
||||
tsKvQueryList.addAll(historyCmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(
|
||||
key, historyCmd.getStartTs() - TimeUnit.DAYS.toMillis(365), historyCmd.getStartTs(), historyCmd.getInterval(), 1, historyCmd.getAgg()
|
||||
)).collect(Collectors.toList()));
|
||||
} else {
|
||||
finalTsKvQueryList = tsKvQueryList;
|
||||
}
|
||||
Map<EntityData, ListenableFuture<List<TsKvEntry>>> fetchResultMap = new HashMap<>();
|
||||
ctx.getData().getData().forEach(entityData -> fetchResultMap.put(entityData,
|
||||
tsService.findAll(ctx.getTenantId(), entityData.getEntityId(), tsKvQueryList)));
|
||||
tsService.findAll(ctx.getTenantId(), entityData.getEntityId(), finalTsKvQueryList)));
|
||||
return Futures.transform(Futures.allAsList(fetchResultMap.values()), f -> {
|
||||
fetchResultMap.forEach((entityData, future) -> {
|
||||
Map<String, List<TsValue>> keyData = new LinkedHashMap<>();
|
||||
@ -394,6 +426,11 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
|
||||
entityTsData.forEach(entry -> keyData.get(entry.getKey()).add(new TsValue(entry.getTs(), entry.getValueAsString())));
|
||||
}
|
||||
keyData.forEach((k, v) -> entityData.getTimeseries().put(k, v.toArray(new TsValue[v.size()])));
|
||||
if (historyCmd.isFetchLatestPreviousPoint()) {
|
||||
entityData.getTimeseries().values().forEach(dataArray -> {
|
||||
Arrays.sort(dataArray, (o1, o2) -> Long.compare(o2.getTs(), o1.getTs()));
|
||||
});
|
||||
}
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
log.warn("[{}][{}][{}] Failed to fetch historical data", ctx.getSessionId(), ctx.getCmdId(), entityData.getEntityId(), e);
|
||||
wsService.sendWsMsg(ctx.getSessionId(),
|
||||
@ -408,119 +445,29 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
|
||||
update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData());
|
||||
}
|
||||
wsService.sendWsMsg(ctx.getSessionId(), update);
|
||||
ctx.getData().getData().forEach(ed -> ed.getTimeseries().clear());
|
||||
return ctx;
|
||||
}, wsCallBackExecutor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelSubscription(String sessionId, EntityDataUnsubscribeCmd cmd) {
|
||||
TbEntityDataSubCtx ctx = getSubCtx(sessionId, cmd.getCmdId());
|
||||
cleanupAndCancel(getSubCtx(sessionId, cmd.getCmdId()));
|
||||
}
|
||||
|
||||
// //TODO 3.1: replace null callbacks with callbacks from websocket service.
|
||||
// @Override
|
||||
// public void addSubscription(TbSubscription subscription) {
|
||||
// EntityId entityId = subscription.getEntityId();
|
||||
// // Telemetry subscription on Entity Views are handled differently, because we need to allow only certain keys and time ranges;
|
||||
// if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW) && TbSubscriptionType.TIMESERIES.equals(subscription.getType())) {
|
||||
// subscription = resolveEntityViewSubscription((TbTimeseriesSubscription) subscription);
|
||||
// }
|
||||
// pushSubscriptionToManagerService(subscription, true);
|
||||
// registerSubscription(subscription);
|
||||
// }
|
||||
|
||||
// private void pushSubscriptionToManagerService(TbSubscription subscription, boolean pushToLocalService) {
|
||||
// TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId());
|
||||
// if (currentPartitions.contains(tpi)) {
|
||||
// // Subscription is managed on the same server;
|
||||
// if (pushToLocalService) {
|
||||
// subscriptionManagerService.addSubscription(subscription, TbCallback.EMPTY);
|
||||
// }
|
||||
// } else {
|
||||
// // Push to the queue;
|
||||
// TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toNewSubscriptionProto(subscription);
|
||||
// clusterService.pushMsgToCore(tpi, subscription.getEntityId().getId(), toCoreMsg, null);
|
||||
// }
|
||||
// }
|
||||
|
||||
@Override
|
||||
public void onSubscriptionUpdate(String sessionId, SubscriptionUpdate update, TbCallback callback) {
|
||||
// TbSubscription subscription = subscriptionsBySessionId
|
||||
// .getOrDefault(sessionId, Collections.emptyMap()).get(update.getSubscriptionId());
|
||||
// if (subscription != null) {
|
||||
// switch (subscription.getType()) {
|
||||
// case TIMESERIES:
|
||||
// TbTimeseriesSubscription tsSub = (TbTimeseriesSubscription) subscription;
|
||||
// update.getLatestValues().forEach((key, value) -> tsSub.getKeyStates().put(key, value));
|
||||
// break;
|
||||
// case ATTRIBUTES:
|
||||
// TbAttributeSubscription attrSub = (TbAttributeSubscription) subscription;
|
||||
// update.getLatestValues().forEach((key, value) -> attrSub.getKeyStates().put(key, value));
|
||||
// break;
|
||||
// }
|
||||
// wsService.sendWsMsg(sessionId, update);
|
||||
// }
|
||||
// callback.onSuccess();
|
||||
private void cleanupAndCancel(TbEntityDataSubCtx ctx) {
|
||||
if (ctx != null) {
|
||||
ctx.cancelRefreshTask();
|
||||
clearSubs(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
// @Override
|
||||
// public void cancelSubscription(String sessionId, int subscriptionId) {
|
||||
// log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId);
|
||||
// Map<Integer, TbSubscription> sessionSubscriptions = subscriptionsBySessionId.get(sessionId);
|
||||
// if (sessionSubscriptions != null) {
|
||||
// TbSubscription subscription = sessionSubscriptions.remove(subscriptionId);
|
||||
// if (subscription != null) {
|
||||
// if (sessionSubscriptions.isEmpty()) {
|
||||
// subscriptionsBySessionId.remove(sessionId);
|
||||
// }
|
||||
// TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId());
|
||||
// if (currentPartitions.contains(tpi)) {
|
||||
// // Subscription is managed on the same server;
|
||||
// subscriptionManagerService.cancelSubscription(sessionId, subscriptionId, TbCallback.EMPTY);
|
||||
// } else {
|
||||
// // Push to the queue;
|
||||
// TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toCloseSubscriptionProto(subscription);
|
||||
// clusterService.pushMsgToCore(tpi, subscription.getEntityId().getId(), toCoreMsg, null);
|
||||
// }
|
||||
// } else {
|
||||
// log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId);
|
||||
// }
|
||||
// } else {
|
||||
// log.debug("[{}] No session subscriptions found!", sessionId);
|
||||
// }
|
||||
// }
|
||||
|
||||
@Override
|
||||
public void cancelAllSessionSubscriptions(String sessionId) {
|
||||
// Map<Integer, TbSubscription> subscriptions = subscriptionsBySessionId.get(sessionId);
|
||||
// if (subscriptions != null) {
|
||||
// Set<Integer> toRemove = new HashSet<>(subscriptions.keySet());
|
||||
// toRemove.forEach(id -> cancelSubscription(sessionId, id));
|
||||
// }
|
||||
}
|
||||
|
||||
private TbSubscription resolveEntityViewSubscription(TbTimeseriesSubscription subscription) {
|
||||
EntityView entityView = entityViewService.findEntityViewById(TenantId.SYS_TENANT_ID, new EntityViewId(subscription.getEntityId().getId()));
|
||||
|
||||
Map<String, Long> keyStates;
|
||||
if (subscription.isAllKeys()) {
|
||||
keyStates = entityView.getKeys().getTimeseries().stream().collect(Collectors.toMap(k -> k, k -> 0L));
|
||||
} else {
|
||||
keyStates = subscription.getKeyStates().entrySet()
|
||||
.stream().filter(entry -> entityView.getKeys().getTimeseries().contains(entry.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
Map<Integer, TbEntityDataSubCtx> sessionSubs = subscriptionsBySessionId.remove(sessionId);
|
||||
if (sessionSubs != null) {
|
||||
sessionSubs.values().forEach(this::cleanupAndCancel);
|
||||
}
|
||||
|
||||
return TbTimeseriesSubscription.builder()
|
||||
.serviceId(subscription.getServiceId())
|
||||
.sessionId(subscription.getSessionId())
|
||||
.subscriptionId(subscription.getSubscriptionId())
|
||||
.tenantId(subscription.getTenantId())
|
||||
.entityId(entityView.getEntityId())
|
||||
.startTime(entityView.getStartTimeMs())
|
||||
.endTime(entityView.getEndTimeMs())
|
||||
.allKeys(false)
|
||||
.keyStates(keyStates).build();
|
||||
}
|
||||
|
||||
private int getLimit(int limit) {
|
||||
|
||||
@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.query.EntityKeyType;
|
||||
import org.thingsboard.server.common.data.query.TsValue;
|
||||
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
|
||||
import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
|
||||
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
|
||||
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
|
||||
import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd;
|
||||
import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd;
|
||||
@ -39,9 +40,14 @@ import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@Data
|
||||
@ -53,12 +59,14 @@ public class TbEntityDataSubCtx {
|
||||
private final TelemetryWebSocketSessionRef sessionRef;
|
||||
private final int cmdId;
|
||||
private EntityDataQuery query;
|
||||
private LatestValueCmd latestCmd;
|
||||
private TimeSeriesCmd tsCmd;
|
||||
private PageData<EntityData> data;
|
||||
private boolean initialDataSent;
|
||||
private List<TbSubscription> tbSubs;
|
||||
private Map<Integer, EntityId> subToEntityIdMap;
|
||||
private volatile ScheduledFuture<?> refreshTask;
|
||||
private TimeSeriesCmd curTsCmd;
|
||||
private LatestValueCmd latestValueCmd;
|
||||
|
||||
public TbEntityDataSubCtx(String serviceId, TelemetryWebSocketService wsService, TelemetryWebSocketSessionRef sessionRef, int cmdId) {
|
||||
this.serviceId = serviceId;
|
||||
@ -86,34 +94,43 @@ public class TbEntityDataSubCtx {
|
||||
public List<TbSubscription> createSubscriptions(List<EntityKey> keys, boolean resultToLatestValues) {
|
||||
this.subToEntityIdMap = new HashMap<>();
|
||||
tbSubs = new ArrayList<>();
|
||||
Map<EntityKeyType, List<EntityKey>> keysByType = new HashMap<>();
|
||||
keys.forEach(key -> keysByType.computeIfAbsent(key.getType(), k -> new ArrayList<>()).add(key));
|
||||
Map<EntityKeyType, List<EntityKey>> keysByType = getEntityKeyByTypeMap(keys);
|
||||
for (EntityData entityData : data.getData()) {
|
||||
keysByType.forEach((keysType, keysList) -> {
|
||||
int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet();
|
||||
subToEntityIdMap.put(subIdx, entityData.getEntityId());
|
||||
switch (keysType) {
|
||||
case TIME_SERIES:
|
||||
tbSubs.add(createTsSub(entityData, subIdx, keysList, resultToLatestValues));
|
||||
break;
|
||||
case CLIENT_ATTRIBUTE:
|
||||
tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.CLIENT_SCOPE, keysList));
|
||||
break;
|
||||
case SHARED_ATTRIBUTE:
|
||||
tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SHARED_SCOPE, keysList));
|
||||
break;
|
||||
case SERVER_ATTRIBUTE:
|
||||
tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SERVER_SCOPE, keysList));
|
||||
break;
|
||||
case ATTRIBUTE:
|
||||
tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.ANY_SCOPE, keysList));
|
||||
break;
|
||||
}
|
||||
});
|
||||
addSubscription(entityData, keysByType, resultToLatestValues);
|
||||
}
|
||||
return tbSubs;
|
||||
}
|
||||
|
||||
private Map<EntityKeyType, List<EntityKey>> getEntityKeyByTypeMap(List<EntityKey> keys) {
|
||||
Map<EntityKeyType, List<EntityKey>> keysByType = new HashMap<>();
|
||||
keys.forEach(key -> keysByType.computeIfAbsent(key.getType(), k -> new ArrayList<>()).add(key));
|
||||
return keysByType;
|
||||
}
|
||||
|
||||
private void addSubscription(EntityData entityData, Map<EntityKeyType, List<EntityKey>> keysByType, boolean resultToLatestValues) {
|
||||
keysByType.forEach((keysType, keysList) -> {
|
||||
int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet();
|
||||
subToEntityIdMap.put(subIdx, entityData.getEntityId());
|
||||
switch (keysType) {
|
||||
case TIME_SERIES:
|
||||
tbSubs.add(createTsSub(entityData, subIdx, keysList, resultToLatestValues));
|
||||
break;
|
||||
case CLIENT_ATTRIBUTE:
|
||||
tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.CLIENT_SCOPE, keysList));
|
||||
break;
|
||||
case SHARED_ATTRIBUTE:
|
||||
tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SHARED_SCOPE, keysList));
|
||||
break;
|
||||
case SERVER_ATTRIBUTE:
|
||||
tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SERVER_SCOPE, keysList));
|
||||
break;
|
||||
case ATTRIBUTE:
|
||||
tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.ANY_SCOPE, keysList));
|
||||
break;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private TbSubscription createAttrSub(EntityData entityData, int subIdx, EntityKeyType keysType, TbAttributeSubscriptionScope scope, List<EntityKey> subKeys) {
|
||||
Map<String, Long> keyStates = buildKeyStats(entityData, keysType, subKeys);
|
||||
log.trace("[{}][{}][{}] Creating attributes subscription with keys: {}", serviceId, cmdId, subIdx, keyStates);
|
||||
@ -275,4 +292,74 @@ public class TbEntityDataSubCtx {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
public void setRefreshTask(ScheduledFuture<?> task) {
|
||||
this.refreshTask = task;
|
||||
}
|
||||
|
||||
public void cancelRefreshTask() {
|
||||
if (this.refreshTask != null) {
|
||||
log.trace("[{}][{}] Canceling old refresh task", sessionRef.getSessionId(), cmdId);
|
||||
this.refreshTask.cancel(true);
|
||||
}
|
||||
}
|
||||
|
||||
public Collection<Integer> update(PageData<EntityData> newData) {
|
||||
Map<EntityId, EntityData> oldDataMap;
|
||||
if (data != null && !data.getData().isEmpty()) {
|
||||
oldDataMap = data.getData().stream().collect(Collectors.toMap(EntityData::getEntityId, Function.identity()));
|
||||
} else {
|
||||
oldDataMap = Collections.emptyMap();
|
||||
}
|
||||
Map<EntityId, EntityData> newDataMap = newData.getData().stream().collect(Collectors.toMap(EntityData::getEntityId, Function.identity()));
|
||||
if (oldDataMap.size() == newDataMap.size() && oldDataMap.keySet().equals(newDataMap.keySet())) {
|
||||
log.trace("[{}][{}] No updates to entity data found", sessionRef.getSessionId(), cmdId);
|
||||
return Collections.emptyList();
|
||||
} else {
|
||||
this.data = newData;
|
||||
List<Integer> subIdsToRemove = new ArrayList<>();
|
||||
Set<EntityId> currentSubs = new HashSet<>();
|
||||
subToEntityIdMap.forEach((subId, entityId) -> {
|
||||
if (!newDataMap.containsKey(entityId)) {
|
||||
subIdsToRemove.add(subId);
|
||||
} else {
|
||||
currentSubs.add(entityId);
|
||||
}
|
||||
});
|
||||
log.trace("[{}][{}] Subscriptions that are invalid: {}", sessionRef.getSessionId(), cmdId, subIdsToRemove);
|
||||
subIdsToRemove.forEach(subToEntityIdMap::remove);
|
||||
List<EntityData> newSubsList = newDataMap.entrySet().stream().filter(entry -> !currentSubs.contains(entry.getKey())).map(Map.Entry::getValue).collect(Collectors.toList());
|
||||
if (!newSubsList.isEmpty()) {
|
||||
boolean resultToLatestValues;
|
||||
List<EntityKey> keys = null;
|
||||
if (curTsCmd != null) {
|
||||
resultToLatestValues = false;
|
||||
keys = curTsCmd.getKeys().stream().map(key -> new EntityKey(EntityKeyType.TIME_SERIES, key)).collect(Collectors.toList());
|
||||
} else if (latestValueCmd != null) {
|
||||
resultToLatestValues = true;
|
||||
keys = latestValueCmd.getKeys();
|
||||
} else {
|
||||
resultToLatestValues = true;
|
||||
}
|
||||
if (keys != null && !keys.isEmpty()) {
|
||||
Map<EntityKeyType, List<EntityKey>> keysByType = getEntityKeyByTypeMap(keys);
|
||||
newSubsList.forEach(
|
||||
entity -> {
|
||||
log.trace("[{}][{}] Found new subscription for entity: {}", sessionRef.getSessionId(), cmdId, entity.getEntityId());
|
||||
if (curTsCmd != null) {
|
||||
addSubscription(entity, keysByType, resultToLatestValues);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
wsService.sendWsMsg(sessionRef.getSessionId(), new EntityDataUpdate(cmdId, data, null));
|
||||
return subIdsToRemove;
|
||||
}
|
||||
}
|
||||
|
||||
public void setCurrentCmd(EntityDataCmd cmd) {
|
||||
curTsCmd = cmd.getTsCmd();
|
||||
latestValueCmd = cmd.getLatestCmd();
|
||||
}
|
||||
}
|
||||
|
||||
@ -15,13 +15,9 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.subscription;
|
||||
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.queue.discovery.ClusterTopologyChangeEvent;
|
||||
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
|
||||
import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
|
||||
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
|
||||
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUnsubscribeCmd;
|
||||
import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
|
||||
|
||||
public interface TbEntityDataSubscriptionService {
|
||||
|
||||
@ -31,9 +27,4 @@ public interface TbEntityDataSubscriptionService {
|
||||
|
||||
void cancelAllSessionSubscriptions(String sessionId);
|
||||
|
||||
void onSubscriptionUpdate(String sessionId, SubscriptionUpdate update, TbCallback callback);
|
||||
|
||||
void onApplicationEvent(PartitionChangeEvent event);
|
||||
|
||||
void onApplicationEvent(ClusterTopologyChangeEvent event);
|
||||
}
|
||||
|
||||
@ -697,15 +697,18 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
|
||||
}
|
||||
|
||||
private void sendWsMsg(TelemetryWebSocketSessionRef sessionRef, int cmdId, Object update) {
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
msgEndpoint.send(sessionRef, cmdId, jsonMapper.writeValueAsString(update));
|
||||
} catch (JsonProcessingException e) {
|
||||
log.warn("[{}] Failed to encode reply: {}", sessionRef.getSessionId(), update, e);
|
||||
} catch (IOException e) {
|
||||
log.warn("[{}] Failed to send reply: {}", sessionRef.getSessionId(), update, e);
|
||||
}
|
||||
});
|
||||
try {
|
||||
String msg = jsonMapper.writeValueAsString(update);
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
msgEndpoint.send(sessionRef, cmdId, msg);
|
||||
} catch (IOException e) {
|
||||
log.warn("[{}] Failed to send reply: {}", sessionRef.getSessionId(), update, e);
|
||||
}
|
||||
});
|
||||
} catch (JsonProcessingException e) {
|
||||
log.warn("[{}] Failed to encode reply: {}", sessionRef.getSessionId(), update, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -29,5 +29,6 @@ public class EntityHistoryCmd {
|
||||
private long interval;
|
||||
private int limit;
|
||||
private Aggregation agg;
|
||||
private boolean fetchLatestPreviousPoint;
|
||||
|
||||
}
|
||||
|
||||
@ -46,6 +46,8 @@ server:
|
||||
max_subscriptions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_REGULAR_USER:0}"
|
||||
max_subscriptions_per_public_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_PUBLIC_USER:0}"
|
||||
max_updates_per_session: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_UPDATES_PER_SESSION:300:1,3000:60}"
|
||||
dynamic_page_link_refresh_interval: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_INTERVAL_SEC:6}"
|
||||
dynamic_page_link_refresh_pool_size: "${TB_SERVER_WS_DYNAMIC_PAGE_LINK_REFRESH_POOL_SIZE:1}"
|
||||
rest:
|
||||
limits:
|
||||
tenant:
|
||||
|
||||
@ -23,27 +23,27 @@ public class BaseReadTsKvQuery extends BaseTsKvQuery implements ReadTsKvQuery {
|
||||
private final long interval;
|
||||
private final int limit;
|
||||
private final Aggregation aggregation;
|
||||
private final String orderBy;
|
||||
private final String order;
|
||||
|
||||
public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation) {
|
||||
this(key, startTs, endTs, interval, limit, aggregation, "DESC");
|
||||
}
|
||||
|
||||
public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation,
|
||||
String orderBy) {
|
||||
String order) {
|
||||
super(key, startTs, endTs);
|
||||
this.interval = interval;
|
||||
this.limit = limit;
|
||||
this.aggregation = aggregation;
|
||||
this.orderBy = orderBy;
|
||||
this.order = order;
|
||||
}
|
||||
|
||||
public BaseReadTsKvQuery(String key, long startTs, long endTs) {
|
||||
this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC");
|
||||
}
|
||||
|
||||
public BaseReadTsKvQuery(String key, long startTs, long endTs, int limit, String orderBy) {
|
||||
this(key, startTs, endTs, endTs - startTs, limit, Aggregation.NONE, orderBy);
|
||||
public BaseReadTsKvQuery(String key, long startTs, long endTs, int limit, String order) {
|
||||
this(key, startTs, endTs, endTs - startTs, limit, Aggregation.NONE, order);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -23,6 +23,6 @@ public interface ReadTsKvQuery extends TsKvQuery {
|
||||
|
||||
Aggregation getAggregation();
|
||||
|
||||
String getOrderBy();
|
||||
String getOrder();
|
||||
|
||||
}
|
||||
|
||||
@ -27,13 +27,17 @@ public class EntityDataPageLink {
|
||||
private int page;
|
||||
private String textSearch;
|
||||
private EntityDataSortOrder sortOrder;
|
||||
private boolean dynamic = false;
|
||||
|
||||
public EntityDataPageLink() {
|
||||
}
|
||||
|
||||
public EntityDataPageLink(int pageSize, int page, String textSearch, EntityDataSortOrder sortOrder) {
|
||||
this(pageSize, page, textSearch, sortOrder, false);
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public EntityDataPageLink nextPageLink() {
|
||||
return new EntityDataPageLink(this.pageSize, this.page+1, this.textSearch, this.sortOrder);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -152,7 +152,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
|
||||
query.getEndTs(),
|
||||
PageRequest.of(0, query.getLimit(),
|
||||
Sort.by(Sort.Direction.fromString(
|
||||
query.getOrderBy()), "ts")));
|
||||
query.getOrder()), "ts")));
|
||||
tsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(query.getKey()));
|
||||
return Futures.immediateFuture(DaoUtil.convertDataList(tsKvEntities));
|
||||
}
|
||||
|
||||
@ -110,7 +110,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
|
||||
query.getEndTs(),
|
||||
PageRequest.of(0, query.getLimit(),
|
||||
Sort.by(Sort.Direction.fromString(
|
||||
query.getOrderBy()), "ts")));
|
||||
query.getOrder()), "ts")));
|
||||
timescaleTsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(strKey));
|
||||
return Futures.immediateFuture(DaoUtil.convertDataList(timescaleTsKvEntities));
|
||||
}
|
||||
|
||||
@ -170,7 +170,7 @@ public class BaseTimeseriesService implements TimeseriesService {
|
||||
} else {
|
||||
endTs = query.getEndTs();
|
||||
}
|
||||
return new BaseReadTsKvQuery(query.getKey(), startTs, endTs, query.getInterval(), query.getLimit(), query.getAggregation(), query.getOrderBy());
|
||||
return new BaseReadTsKvQuery(query.getKey(), startTs, endTs, query.getInterval(), query.getLimit(), query.getAggregation(), query.getOrder());
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
||||
@ -168,7 +168,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
|
||||
while (stepTs < query.getEndTs()) {
|
||||
long startTs = stepTs;
|
||||
long endTs = stepTs + step;
|
||||
ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy());
|
||||
ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrder());
|
||||
futures.add(findAndAggregateAsync(tenantId, entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
|
||||
stepTs = endTs;
|
||||
}
|
||||
|
||||
@ -40,7 +40,7 @@ public class TsKvQueryCursor extends QueryCursor {
|
||||
|
||||
public TsKvQueryCursor(String entityType, UUID entityId, ReadTsKvQuery baseQuery, List<Long> partitions) {
|
||||
super(entityType, entityId, baseQuery, partitions);
|
||||
this.orderBy = baseQuery.getOrderBy();
|
||||
this.orderBy = baseQuery.getOrder();
|
||||
this.partitionIndex = isDesc() ? partitions.size() - 1 : 0;
|
||||
this.data = new ArrayList<>();
|
||||
this.currentLimit = baseQuery.getLimit();
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user