WS API Improvements

This commit is contained in:
Andrew Shvayka 2020-06-25 14:55:58 +03:00
parent 0533416982
commit 8cba4400df
5 changed files with 41 additions and 39 deletions

View File

@ -124,7 +124,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
@Override @Override
public void addSubscription(TbSubscription subscription, TbCallback callback) { public void addSubscription(TbSubscription subscription, TbCallback callback) {
log.trace("[{}][{}][{}] Registering remote subscription for entity [{}]", log.trace("[{}][{}][{}] Registering subscription for entity [{}]",
subscription.getServiceId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId()); subscription.getServiceId(), subscription.getSessionId(), subscription.getSubscriptionId(), subscription.getEntityId());
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId()); TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, subscription.getTenantId(), subscription.getEntityId());
if (currentPartitions.contains(tpi)) { if (currentPartitions.contains(tpi)) {

View File

@ -145,6 +145,9 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
if (wsCallBackExecutor != null) { if (wsCallBackExecutor != null) {
wsCallBackExecutor.shutdownNow(); wsCallBackExecutor.shutdownNow();
} }
if (scheduler != null) {
scheduler.shutdownNow();
}
} }
@Override @Override
@ -230,11 +233,12 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
private void refreshDynamicQuery(TenantId tenantId, CustomerId customerId, TbEntityDataSubCtx finalCtx) { private void refreshDynamicQuery(TenantId tenantId, CustomerId customerId, TbEntityDataSubCtx finalCtx) {
try { try {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Collection<Integer> oldSubIds = finalCtx.update(entityService.findEntityDataByQuery(tenantId, customerId, finalCtx.getQuery())); TbEntityDataSubCtx.TbEntityDataSubCtxUpdateResult result = finalCtx.update(entityService.findEntityDataByQuery(tenantId, customerId, finalCtx.getQuery()));
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
dynamicQueryInvocationCnt.incrementAndGet(); dynamicQueryInvocationCnt.incrementAndGet();
dynamicQueryTimeSpent.addAndGet(end - start); dynamicQueryTimeSpent.addAndGet(end - start);
oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId)); result.getSubsToCancel().forEach(subId -> localSubscriptionService.cancelSubscription(finalCtx.getSessionId(), subId));
result.getSubsToAdd().forEach(localSubscriptionService::addSubscription);
} catch (Exception e) { } catch (Exception e) {
log.warn("[{}][{}] Failed to refresh query", finalCtx.getSessionId(), finalCtx.getCmdId(), e); log.warn("[{}][{}] Failed to refresh query", finalCtx.getSessionId(), finalCtx.getCmdId(), e);
} }
@ -340,21 +344,6 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
}, wsCallBackExecutor); }, wsCallBackExecutor);
} }
private List<ReadTsKvQuery> getReadTsKvQueries(GetTsCmd cmd) {
List<ReadTsKvQuery> finalTsKvQueryList;
List<ReadTsKvQuery> queries = cmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(),
getLimit(cmd.getLimit()), cmd.getAgg())).collect(Collectors.toList());
if (cmd.isFetchLatestPreviousPoint()) {
finalTsKvQueryList = new ArrayList<>(queries);
finalTsKvQueryList.addAll(cmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(
key, cmd.getStartTs() - TimeUnit.DAYS.toMillis(365), cmd.getStartTs(), cmd.getInterval(), 1, cmd.getAgg()
)).collect(Collectors.toList()));
} else {
finalTsKvQueryList = queries;
}
return finalTsKvQueryList;
}
private void handleLatestCmd(TbEntityDataSubCtx ctx, LatestValueCmd latestCmd) { private void handleLatestCmd(TbEntityDataSubCtx ctx, LatestValueCmd latestCmd) {
log.trace("[{}][{}] Going to process latest command: {}", ctx.getSessionId(), ctx.getCmdId(), latestCmd); log.trace("[{}][{}] Going to process latest command: {}", ctx.getSessionId(), ctx.getCmdId(), latestCmd);
//Fetch the latest values for telemetry keys (in case they are not copied from NoSQL to SQL DB in hybrid mode. //Fetch the latest values for telemetry keys (in case they are not copied from NoSQL to SQL DB in hybrid mode.

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.server.service.subscription; package org.thingsboard.server.service.subscription;
import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.CustomerId;
@ -62,7 +63,6 @@ public class TbEntityDataSubCtx {
private TimeSeriesCmd tsCmd; private TimeSeriesCmd tsCmd;
private PageData<EntityData> data; private PageData<EntityData> data;
private boolean initialDataSent; private boolean initialDataSent;
private List<TbSubscription> tbSubs;
private Map<Integer, EntityId> subToEntityIdMap; private Map<Integer, EntityId> subToEntityIdMap;
private volatile ScheduledFuture<?> refreshTask; private volatile ScheduledFuture<?> refreshTask;
private TimeSeriesCmd curTsCmd; private TimeSeriesCmd curTsCmd;
@ -93,10 +93,10 @@ public class TbEntityDataSubCtx {
public List<TbSubscription> createSubscriptions(List<EntityKey> keys, boolean resultToLatestValues) { public List<TbSubscription> createSubscriptions(List<EntityKey> keys, boolean resultToLatestValues) {
this.subToEntityIdMap = new HashMap<>(); this.subToEntityIdMap = new HashMap<>();
tbSubs = new ArrayList<>(); List<TbSubscription> tbSubs = new ArrayList<>();
Map<EntityKeyType, List<EntityKey>> keysByType = getEntityKeyByTypeMap(keys); Map<EntityKeyType, List<EntityKey>> keysByType = getEntityKeyByTypeMap(keys);
for (EntityData entityData : data.getData()) { for (EntityData entityData : data.getData()) {
addSubscription(entityData, keysByType, resultToLatestValues); tbSubs.addAll(addSubscriptions(entityData, keysByType, resultToLatestValues));
} }
return tbSubs; return tbSubs;
} }
@ -107,33 +107,35 @@ public class TbEntityDataSubCtx {
return keysByType; return keysByType;
} }
private void addSubscription(EntityData entityData, Map<EntityKeyType, List<EntityKey>> keysByType, boolean resultToLatestValues) { private List<TbSubscription> addSubscriptions(EntityData entityData, Map<EntityKeyType, List<EntityKey>> keysByType, boolean resultToLatestValues) {
List<TbSubscription> subscriptionList = new ArrayList<>();
keysByType.forEach((keysType, keysList) -> { keysByType.forEach((keysType, keysList) -> {
int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet(); int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet();
subToEntityIdMap.put(subIdx, entityData.getEntityId()); subToEntityIdMap.put(subIdx, entityData.getEntityId());
switch (keysType) { switch (keysType) {
case TIME_SERIES: case TIME_SERIES:
tbSubs.add(createTsSub(entityData, subIdx, keysList, resultToLatestValues)); subscriptionList.add(createTsSub(entityData, subIdx, keysList, resultToLatestValues));
break; break;
case CLIENT_ATTRIBUTE: case CLIENT_ATTRIBUTE:
tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.CLIENT_SCOPE, keysList)); subscriptionList.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.CLIENT_SCOPE, keysList));
break; break;
case SHARED_ATTRIBUTE: case SHARED_ATTRIBUTE:
tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SHARED_SCOPE, keysList)); subscriptionList.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SHARED_SCOPE, keysList));
break; break;
case SERVER_ATTRIBUTE: case SERVER_ATTRIBUTE:
tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SERVER_SCOPE, keysList)); subscriptionList.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.SERVER_SCOPE, keysList));
break; break;
case ATTRIBUTE: case ATTRIBUTE:
tbSubs.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.ANY_SCOPE, keysList)); subscriptionList.add(createAttrSub(entityData, subIdx, keysType, TbAttributeSubscriptionScope.ANY_SCOPE, keysList));
break; break;
} }
}); });
return subscriptionList;
} }
private TbSubscription createAttrSub(EntityData entityData, int subIdx, EntityKeyType keysType, TbAttributeSubscriptionScope scope, List<EntityKey> subKeys) { private TbSubscription createAttrSub(EntityData entityData, int subIdx, EntityKeyType keysType, TbAttributeSubscriptionScope scope, List<EntityKey> subKeys) {
Map<String, Long> keyStates = buildKeyStats(entityData, keysType, subKeys); Map<String, Long> keyStates = buildKeyStats(entityData, keysType, subKeys);
log.trace("[{}][{}][{}] Creating attributes subscription with keys: {}", serviceId, cmdId, subIdx, keyStates); log.trace("[{}][{}][{}] Creating attributes subscription for [{}] with keys: {}", serviceId, cmdId, subIdx, entityData.getEntityId(), keyStates);
return TbAttributeSubscription.builder() return TbAttributeSubscription.builder()
.serviceId(serviceId) .serviceId(serviceId)
.sessionId(sessionRef.getSessionId()) .sessionId(sessionRef.getSessionId())
@ -156,7 +158,7 @@ public class TbEntityDataSubCtx {
keyStates.put(k, ts); keyStates.put(k, ts);
}); });
} }
log.trace("[{}][{}][{}] Creating time-series subscription with keys: {}", serviceId, cmdId, subIdx, keyStates); log.trace("[{}][{}][{}] Creating time-series subscription for [{}] with keys: {}", serviceId, cmdId, subIdx, entityData.getEntityId(), keyStates);
return TbTimeseriesSubscription.builder() return TbTimeseriesSubscription.builder()
.serviceId(serviceId) .serviceId(serviceId)
.sessionId(sessionRef.getSessionId()) .sessionId(sessionRef.getSessionId())
@ -304,7 +306,7 @@ public class TbEntityDataSubCtx {
} }
} }
public Collection<Integer> update(PageData<EntityData> newData) { public TbEntityDataSubCtxUpdateResult update(PageData<EntityData> newData) {
Map<EntityId, EntityData> oldDataMap; Map<EntityId, EntityData> oldDataMap;
if (data != null && !data.getData().isEmpty()) { if (data != null && !data.getData().isEmpty()) {
oldDataMap = data.getData().stream().collect(Collectors.toMap(EntityData::getEntityId, Function.identity())); oldDataMap = data.getData().stream().collect(Collectors.toMap(EntityData::getEntityId, Function.identity()));
@ -314,20 +316,21 @@ public class TbEntityDataSubCtx {
Map<EntityId, EntityData> newDataMap = newData.getData().stream().collect(Collectors.toMap(EntityData::getEntityId, Function.identity())); Map<EntityId, EntityData> newDataMap = newData.getData().stream().collect(Collectors.toMap(EntityData::getEntityId, Function.identity()));
if (oldDataMap.size() == newDataMap.size() && oldDataMap.keySet().equals(newDataMap.keySet())) { if (oldDataMap.size() == newDataMap.size() && oldDataMap.keySet().equals(newDataMap.keySet())) {
log.trace("[{}][{}] No updates to entity data found", sessionRef.getSessionId(), cmdId); log.trace("[{}][{}] No updates to entity data found", sessionRef.getSessionId(), cmdId);
return Collections.emptyList(); return TbEntityDataSubCtxUpdateResult.EMPTY;
} else { } else {
this.data = newData; this.data = newData;
List<Integer> subIdsToRemove = new ArrayList<>(); List<Integer> subIdsToCancel = new ArrayList<>();
List<TbSubscription> subsToAdd = new ArrayList<>();
Set<EntityId> currentSubs = new HashSet<>(); Set<EntityId> currentSubs = new HashSet<>();
subToEntityIdMap.forEach((subId, entityId) -> { subToEntityIdMap.forEach((subId, entityId) -> {
if (!newDataMap.containsKey(entityId)) { if (!newDataMap.containsKey(entityId)) {
subIdsToRemove.add(subId); subIdsToCancel.add(subId);
} else { } else {
currentSubs.add(entityId); currentSubs.add(entityId);
} }
}); });
log.trace("[{}][{}] Subscriptions that are invalid: {}", sessionRef.getSessionId(), cmdId, subIdsToRemove); log.trace("[{}][{}] Subscriptions that are invalid: {}", sessionRef.getSessionId(), cmdId, subIdsToCancel);
subIdsToRemove.forEach(subToEntityIdMap::remove); subIdsToCancel.forEach(subToEntityIdMap::remove);
List<EntityData> newSubsList = newDataMap.entrySet().stream().filter(entry -> !currentSubs.contains(entry.getKey())).map(Map.Entry::getValue).collect(Collectors.toList()); List<EntityData> newSubsList = newDataMap.entrySet().stream().filter(entry -> !currentSubs.contains(entry.getKey())).map(Map.Entry::getValue).collect(Collectors.toList());
if (!newSubsList.isEmpty()) { if (!newSubsList.isEmpty()) {
boolean resultToLatestValues; boolean resultToLatestValues;
@ -346,13 +349,13 @@ public class TbEntityDataSubCtx {
newSubsList.forEach( newSubsList.forEach(
entity -> { entity -> {
log.trace("[{}][{}] Found new subscription for entity: {}", sessionRef.getSessionId(), cmdId, entity.getEntityId()); log.trace("[{}][{}] Found new subscription for entity: {}", sessionRef.getSessionId(), cmdId, entity.getEntityId());
addSubscription(entity, keysByType, resultToLatestValues); subsToAdd.addAll(addSubscriptions(entity, keysByType, resultToLatestValues));
} }
); );
} }
} }
wsService.sendWsMsg(sessionRef.getSessionId(), new EntityDataUpdate(cmdId, data, null)); wsService.sendWsMsg(sessionRef.getSessionId(), new EntityDataUpdate(cmdId, data, null));
return subIdsToRemove; return new TbEntityDataSubCtxUpdateResult(subIdsToCancel, subsToAdd);
} }
} }
@ -360,4 +363,14 @@ public class TbEntityDataSubCtx {
curTsCmd = cmd.getTsCmd(); curTsCmd = cmd.getTsCmd();
latestValueCmd = cmd.getLatestCmd(); latestValueCmd = cmd.getLatestCmd();
} }
@Data
@AllArgsConstructor
public static class TbEntityDataSubCtxUpdateResult {
private static TbEntityDataSubCtxUpdateResult EMPTY = new TbEntityDataSubCtxUpdateResult(Collections.emptyList(), Collections.emptyList());
private List<Integer> subsToCancel;
private List<TbSubscription> subsToAdd;
}
} }

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,