From 1e1b58f072372334b1c947c33f8c5ff0a63e29b1 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Mon, 22 Jun 2020 16:54:56 +0300 Subject: [PATCH] TMP commit --- ...efaultTbEntityDataSubscriptionService.java | 148 ++++++++++++------ .../subscription/TbEntityDataSubCtx.java | 46 ++++++ 2 files changed, 144 insertions(+), 50 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index a86c5c730d..91db84a980 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -84,7 +84,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc private static final int DEFAULT_LIMIT = 100; private final Set currentPartitions = ConcurrentHashMap.newKeySet(); - private final Map> subscriptionsBySessionId = new ConcurrentHashMap<>(); + private final Map> subscriptionsBySessionId = new ConcurrentHashMap<>(); @Autowired private TelemetryWebSocketService wsService; @@ -155,39 +155,87 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @Override public void handleCmd(TelemetryWebSocketSessionRef session, EntityDataCmd cmd) { - if (cmd.getHistoryCmd() != null) { - handleHistoryCmd(session, cmd.getCmdId(), cmd.getQuery(), cmd.getHistoryCmd()); - } else if (cmd.getLatestCmd() != null) { - handleLatestCmd(session, cmd.getCmdId(), cmd.getQuery(), cmd.getLatestCmd()); + TbEntityDataSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId()); + if (ctx != null) { + log.debug("[{}][{}] Updating existing subscriptions using: {}", session.getSessionId(), cmd.getCmdId(), cmd); + //TODO: cleanup old subscription; } else { - handleTimeseriesCmd(session, cmd.getCmdId(), cmd.getQuery(), cmd.getTsCmd()); + log.debug("[{}][{}] Creating new subscription using: {}", session.getSessionId(), cmd.getCmdId(), cmd); + ctx = createSubCtx(session, cmd); + } + if (cmd.getQuery() != null) { + if (ctx.getQuery() == null) { + log.debug("[{}][{}] Initializing data using query: {}", session.getSessionId(), cmd.getCmdId(), cmd.getQuery()); + } else { + log.debug("[{}][{}] Updating data using query: {}", session.getSessionId(), cmd.getCmdId(), cmd.getQuery()); + } + ctx.setQuery(cmd.getQuery()); + TenantId tenantId = ctx.getTenantId(); + CustomerId customerId = ctx.getCustomerId(); + EntityDataQuery query = ctx.getQuery(); + //Step 1. Update existing query with the contents of LatestValueCmd + if (cmd.getLatestCmd() != null) { + cmd.getLatestCmd().getKeys().forEach(key -> { + if (!query.getLatestValues().contains(key)) { + query.getLatestValues().add(key); + } + }); + } + PageData data = entityService.findEntityDataByQuery(tenantId, customerId, ctx.getQuery()); + ctx.setData(data); + } + ListenableFuture historyFuture; + if (cmd.getHistoryCmd() != null) { + historyFuture = handleHistoryCmd(ctx, cmd.getHistoryCmd()); + } else { + historyFuture = Futures.immediateFuture(ctx); + } + if (cmd.getLatestCmd() != null) { + Futures.addCallback(historyFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable TbEntityDataSubCtx theCtx) { + handleLatestCmd(theCtx, cmd.getLatestCmd()); + } + + @Override + public void onFailure(Throwable t) { + log.warn("[{}][{}] Failed to process command", session.getSessionId(), cmd.getCmdId()); + } + }, wsCallBackExecutor); + } else if (cmd.getTsCmd() != null) { + handleTimeseriesCmd(ctx, cmd.getTsCmd()); } } - private void handleTimeseriesCmd(TelemetryWebSocketSessionRef session, int cmdId, EntityDataQuery query, TimeSeriesCmd tsCmd) { + private TbEntityDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, EntityDataCmd cmd) { + Map sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>()); + TbEntityDataSubCtx ctx = new TbEntityDataSubCtx(sessionRef, cmd.getCmdId()); + ctx.setQuery(cmd.getQuery()); + sessionSubs.put(cmd.getCmdId(), ctx); + return ctx; } - private void handleLatestCmd(TelemetryWebSocketSessionRef session, int cmdId, EntityDataQuery query, LatestValueCmd latestCmd) { - TenantId tenantId = session.getSecurityCtx().getTenantId(); - CustomerId customerId = session.getSecurityCtx().getCustomerId(); - //Step 1. Update existing query with the contents of LatestValueCmd - latestCmd.getKeys().forEach(key -> { - if (!query.getLatestValues().contains(key)) { - query.getLatestValues().add(key); - } - }); + private TbEntityDataSubCtx getSubCtx(String sessionId, int cmdId) { + Map sessionSubs = subscriptionsBySessionId.get(sessionId); + if (sessionSubs != null) { + return sessionSubs.get(cmdId); + } else { + return null; + } + } - //Step 2. Fetch the initial data - PageData data = entityService.findEntityDataByQuery(tenantId, customerId, query); + private void handleTimeseriesCmd(TbEntityDataSubCtx ctx, TimeSeriesCmd tsCmd) { + } - //Step 3. Fetch the latest values for telemetry keys (in case they are not copied from NoSQL to SQL DB in hybrid mode. + private void handleLatestCmd(TbEntityDataSubCtx ctx, LatestValueCmd latestCmd) { + //Fetch the latest values for telemetry keys (in case they are not copied from NoSQL to SQL DB in hybrid mode. if (!tsInSqlDB) { List allTsKeys = latestCmd.getKeys().stream() .filter(key -> key.getType().equals(EntityKeyType.TIME_SERIES)) .map(EntityKey::getKey).collect(Collectors.toList()); Map>> missingTelemetryFurutes = new HashMap<>(); - for (EntityData entityData : data.getData()) { + for (EntityData entityData : ctx.getData().getData()) { Map> latestEntityData = entityData.getLatest(); Map tsEntityData = latestEntityData.get(EntityKeyType.TIME_SERIES); Set missingTsKeys = new LinkedHashSet<>(allTsKeys); @@ -198,7 +246,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc latestEntityData.put(EntityKeyType.TIME_SERIES, tsEntityData); } - ListenableFuture> missingTsData = tsService.findLatest(tenantId, entityData.getEntityId(), missingTsKeys); + ListenableFuture> missingTsData = tsService.findLatest(ctx.getTenantId(), entityData.getEntityId(), missingTsKeys); missingTelemetryFurutes.put(entityData, Futures.transform(missingTsData, this::toTsValue, MoreExecutors.directExecutor())); } Futures.addCallback(Futures.allAsList(missingTelemetryFurutes.values()), new FutureCallback>>() { @@ -208,24 +256,31 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc try { key.getLatest().get(EntityKeyType.TIME_SERIES).putAll(value.get()); } catch (InterruptedException | ExecutionException e) { - log.warn("[{}][{}] Failed to lookup latest telemetry: {}:{}", session.getSessionId(), cmdId, key.getEntityId(), allTsKeys, e); + log.warn("[{}][{}] Failed to lookup latest telemetry: {}:{}", ctx.getSessionId(), ctx.getCmdId(), key.getEntityId(), allTsKeys, e); } }); - EntityDataUpdate update = new EntityDataUpdate(cmdId, data, null); - wsService.sendWsMsg(session.getSessionId(), update); + EntityDataUpdate update; + if (!ctx.isInitialDataSent()) { + update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null); + } else { + update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData()); + } + wsService.sendWsMsg(ctx.getSessionId(), update); //TODO: create context for this (session, cmdId) that contains query, latestCmd and update. Subscribe + periodic updates. } @Override public void onFailure(Throwable t) { - log.warn("[{}][{}] Failed to process websocket command: {}:{}", session.getSessionId(), cmdId, query, latestCmd, t); - wsService.sendWsMsg(session.getSessionId(), - new EntityDataUpdate(cmdId, SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to process websocket command!")); + log.warn("[{}][{}] Failed to process websocket command: {}:{}", ctx.getSessionId(), ctx.getCmdId(), ctx.getQuery(), latestCmd, t); + wsService.sendWsMsg(ctx.getSessionId(), + new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to process websocket command!")); } }, wsCallBackExecutor); } else { - EntityDataUpdate update = new EntityDataUpdate(cmdId, data, null); - wsService.sendWsMsg(session.getSessionId(), update); + if (!ctx.isInitialDataSent()) { + EntityDataUpdate update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null); + wsService.sendWsMsg(ctx.getSessionId(), update); + } //TODO: create context for this (session, cmdId) that contains query, latestCmd and update. Subscribe + periodic updates. } } @@ -234,17 +289,14 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc return data.stream().collect(Collectors.toMap(TsKvEntry::getKey, value -> new TsValue(value.getTs(), value.getValueAsString()))); } - private void handleHistoryCmd(TelemetryWebSocketSessionRef session, int cmdId, EntityDataQuery query, EntityHistoryCmd historyCmd) { - TenantId tenantId = session.getSecurityCtx().getTenantId(); - CustomerId customerId = session.getSecurityCtx().getCustomerId(); - PageData data = entityService.findEntityDataByQuery(tenantId, customerId, query); + private ListenableFuture handleHistoryCmd(TbEntityDataSubCtx ctx, EntityHistoryCmd historyCmd) { List tsKvQueryList = historyCmd.getKeys().stream().map(key -> new BaseReadTsKvQuery( key, historyCmd.getStartTs(), historyCmd.getEndTs(), historyCmd.getInterval(), getLimit(historyCmd.getLimit()), historyCmd.getAgg() )).collect(Collectors.toList()); Map>> fetchResultMap = new HashMap<>(); - data.getData().forEach(entityData -> fetchResultMap.put(entityData, - tsService.findAll(tenantId, entityData.getEntityId(), tsKvQueryList))); - Futures.allAsList(fetchResultMap.values()).addListener(() -> { + ctx.getData().getData().forEach(entityData -> fetchResultMap.put(entityData, + tsService.findAll(ctx.getTenantId(), entityData.getEntityId(), tsKvQueryList))); + return Futures.transform(Futures.allAsList(fetchResultMap.values()), f -> { fetchResultMap.forEach((entityData, future) -> { Map> keyData = new LinkedHashMap<>(); historyCmd.getKeys().forEach(key -> keyData.put(key, new ArrayList<>())); @@ -255,20 +307,21 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc } keyData.forEach((k, v) -> entityData.getTimeseries().put(k, v.toArray(new TsValue[v.size()]))); } catch (InterruptedException | ExecutionException e) { - log.warn("[{}][{}][{}] Failed to fetch historical data", session.getSessionId(), cmdId, entityData.getEntityId(), e); - wsService.sendWsMsg(session.getSessionId(), - new EntityDataUpdate(cmdId, SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!")); + log.warn("[{}][{}][{}] Failed to fetch historical data", ctx.getSessionId(), ctx.getCmdId(), entityData.getEntityId(), e); + wsService.sendWsMsg(ctx.getSessionId(), + new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!")); } }); - EntityDataUpdate update = new EntityDataUpdate(cmdId, data, null); - wsService.sendWsMsg(session.getSessionId(), update); + EntityDataUpdate update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null); + wsService.sendWsMsg(ctx.getSessionId(), update); + ctx.setInitialDataSent(true); + return ctx; }, wsCallBackExecutor); } - @Override - public void cancelSubscription(String sessionId, EntityDataUnsubscribeCmd subscriptionId) { - + public void cancelSubscription(String sessionId, EntityDataUnsubscribeCmd cmd) { + TbEntityDataSubCtx ctx = getSubCtx(sessionId, cmd.getCmdId()); } // //TODO 3.1: replace null callbacks with callbacks from websocket service. @@ -377,11 +430,6 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc .keyStates(keyStates).build(); } - private void registerSubscription(TbSubscription subscription) { - Map sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>()); - sessionSubscriptions.put(subscription.getSubscriptionId(), subscription); - } - private int getLimit(int limit) { return limit == 0 ? DEFAULT_LIMIT : limit; } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java new file mode 100644 index 0000000000..36ee86a116 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java @@ -0,0 +1,46 @@ +package org.thingsboard.server.service.subscription; + +import lombok.Data; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.query.EntityData; +import org.thingsboard.server.common.data.query.EntityDataQuery; +import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; +import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd; +import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd; + +@Data +public class TbEntityDataSubCtx { + + private final TelemetryWebSocketSessionRef sessionRef; + private final int cmdId; + private EntityDataQuery query; + private LatestValueCmd latestCmd; + private TimeSeriesCmd tsCmd; + private PageData data; + private boolean initialDataSent; + + public TbEntityDataSubCtx(TelemetryWebSocketSessionRef sessionRef, int cmdId) { + this.sessionRef = sessionRef; + this.cmdId = cmdId; + } + + public String getSessionId() { + return sessionRef.getSessionId(); + } + + public TenantId getTenantId() { + return sessionRef.getSecurityCtx().getTenantId(); + } + + public CustomerId getCustomerId() { + return sessionRef.getSecurityCtx().getCustomerId(); + } + + + public void setData(PageData data) { + this.data = data; + } + +}