From 5e2667d3dec644d48e4fb45084b0fbf939b8fe2a Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Sun, 1 Apr 2018 00:20:05 +0300 Subject: [PATCH] Implementation of Ts Websocket Service --- .../server/actors/ActorSystemContext.java | 26 +- .../actors/ruleChain/DefaultTbContext.java | 48 ++- .../controller/HttpValidationCallback.java | 32 ++ .../controller/TelemetryController.java | 2 + .../service/security/AccessValidator.java | 18 +- .../security}/ValidationCallback.java | 29 +- .../DefaultTelemetrySubscriptionService.java | 30 +- .../DefaultTelemetryWebSocketService.java | 312 +++++++++++++++++- .../TelemetrySubscriptionService.java | 7 + .../rule/engine/api/TbContext.java | 21 +- 10 files changed, 462 insertions(+), 63 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/controller/HttpValidationCallback.java rename application/src/main/java/org/thingsboard/server/{controller => service/security}/ValidationCallback.java (59%) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 9e02946574..3cc211c26e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -61,12 +61,17 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService; import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; import org.thingsboard.server.service.cluster.rpc.ClusterRpcService; import org.thingsboard.server.service.component.ComponentDiscoveryService; +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.nio.charset.StandardCharsets; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; @Slf4j @Component @@ -156,6 +161,10 @@ public class ActorSystemContext { @Getter private AuditLogService auditLogService; + @Autowired + @Getter + private TelemetrySubscriptionService tsSubService; + @Autowired @Getter @Setter @@ -224,6 +233,21 @@ public class ActorSystemContext { @Getter private final Config config; + @Getter + private ExecutorService tsCallBackExecutor; + + @PostConstruct + public void initExecutor() { + tsCallBackExecutor = Executors.newSingleThreadExecutor(); + } + + @PreDestroy + public void shutdownExecutor() { + if (tsCallBackExecutor != null) { + tsCallBackExecutor.shutdownNow(); + } + } + public ActorSystemContext() { config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load()); } @@ -345,7 +369,7 @@ public class ActorSystemContext { return Exception.class.isInstance(error) ? (Exception) error : new Exception(error); } - public ListeningExecutor getExecutor() { + public ListeningExecutor getJsExecutor() { //TODO: take thread count from yml. return new JsExecutorService(1); } diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 1d45b61dc3..b3b00725d5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,12 +15,18 @@ */ package org.thingsboard.server.actors.ruleChain; -import akka.actor.ActorContext; import akka.actor.ActorRef; +import com.google.common.base.Function; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.rule.engine.api.ListeningExecutor; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.dao.alarm.AlarmService; @@ -35,6 +41,8 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; import scala.concurrent.duration.Duration; +import javax.annotation.Nullable; +import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -43,6 +51,7 @@ import java.util.concurrent.TimeUnit; */ class DefaultTbContext implements TbContext { + private static final Function, ? extends Void> LIST_VOID_FUNCTION = v -> null; private final ActorSystemContext mainCtx; private final RuleNodeCtx nodeCtx; @@ -112,9 +121,36 @@ class DefaultTbContext implements TbContext { relationTypes.forEach(type -> tellNext(msg, type)); } + @Override + public void saveAndNotify(EntityId entityId, List ts, FutureCallback callback) { + saveAndNotify(entityId, ts, 0L, callback); + } + + @Override + public void saveAndNotify(EntityId entityId, List ts, long ttl, FutureCallback callback) { + ListenableFuture> saveFuture = mainCtx.getTsService().save(entityId, ts, ttl); + Futures.addCallback(saveFuture, new FutureCallback>() { + @Override + public void onSuccess(@Nullable List result) { + mainCtx.getTsSubService().onLocalTimeseriesUpdate(entityId, ts); + callback.onSuccess(null); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }, mainCtx.getTsCallBackExecutor()); + } + + @Override + public void saveAndNotify(EntityId entityId, String scope, Set attributes, FutureCallback callback) { + + } + @Override public ListeningExecutor getJsExecutor() { - return mainCtx.getExecutor(); + return mainCtx.getJsExecutor(); } @Override diff --git a/application/src/main/java/org/thingsboard/server/controller/HttpValidationCallback.java b/application/src/main/java/org/thingsboard/server/controller/HttpValidationCallback.java new file mode 100644 index 0000000000..fb1f3e7bee --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/controller/HttpValidationCallback.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.controller; + +import com.google.common.util.concurrent.FutureCallback; +import org.springframework.http.ResponseEntity; +import org.springframework.web.context.request.async.DeferredResult; +import org.thingsboard.server.service.security.ValidationCallback; + +/** + * Created by ashvayka on 21.02.17. + */ +public class HttpValidationCallback extends ValidationCallback> { + + public HttpValidationCallback(DeferredResult response, FutureCallback> action) { + super(response, action); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index a2867cecdc..c2a2df841c 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -69,6 +69,7 @@ import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import javax.annotation.Nullable; +import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Arrays; @@ -101,6 +102,7 @@ public class TelemetryController extends BaseController { private ExecutorService executor; + @PostConstruct public void initExecutor() { executor = Executors.newSingleThreadExecutor(); } diff --git a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java index 01bd23869d..02e12fe53f 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java +++ b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java @@ -22,7 +22,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.rule.RuleChain; -import org.thingsboard.server.controller.ValidationCallback; +import org.thingsboard.server.controller.HttpValidationCallback; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.customer.CustomerService; @@ -91,7 +91,7 @@ public class AccessValidator { final DeferredResult response = new DeferredResult<>(); - validate(currentUser, entityId, new ValidationCallback(response, + validate(currentUser, entityId, new HttpValidationCallback(response, new FutureCallback>() { @Override public void onSuccess(@Nullable DeferredResult result) { @@ -107,7 +107,7 @@ public class AccessValidator { return response; } - public void validate(SecurityUser currentUser, EntityId entityId, ValidationCallback callback) { + public void validate(SecurityUser currentUser, EntityId entityId, FutureCallback callback) { switch (entityId.getEntityType()) { case DEVICE: validateDevice(currentUser, entityId, callback); @@ -130,7 +130,7 @@ public class AccessValidator { } } - private void validateDevice(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) { + private void validateDevice(final SecurityUser currentUser, EntityId entityId, FutureCallback callback) { if (currentUser.isSystemAdmin()) { callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION)); } else { @@ -151,7 +151,7 @@ public class AccessValidator { } } - private void validateAsset(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) { + private void validateAsset(final SecurityUser currentUser, EntityId entityId, FutureCallback callback) { if (currentUser.isSystemAdmin()) { callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION)); } else { @@ -173,7 +173,7 @@ public class AccessValidator { } - private void validateRuleChain(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) { + private void validateRuleChain(final SecurityUser currentUser, EntityId entityId, FutureCallback callback) { if (currentUser.isCustomerUser()) { callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION)); } else { @@ -194,7 +194,7 @@ public class AccessValidator { } } - private void validateCustomer(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) { + private void validateCustomer(final SecurityUser currentUser, EntityId entityId, FutureCallback callback) { if (currentUser.isSystemAdmin()) { callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION)); } else { @@ -215,7 +215,7 @@ public class AccessValidator { } } - private void validateTenant(final SecurityUser currentUser, EntityId entityId, ValidationCallback callback) { + private void validateTenant(final SecurityUser currentUser, EntityId entityId, FutureCallback callback) { if (currentUser.isCustomerUser()) { callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION)); } else if (currentUser.isSystemAdmin()) { @@ -234,7 +234,7 @@ public class AccessValidator { } } - private FutureCallback getCallback(ValidationCallback callback, Function transformer) { + private FutureCallback getCallback(FutureCallback callback, Function transformer) { return new FutureCallback() { @Override public void onSuccess(@Nullable T result) { diff --git a/application/src/main/java/org/thingsboard/server/controller/ValidationCallback.java b/application/src/main/java/org/thingsboard/server/service/security/ValidationCallback.java similarity index 59% rename from application/src/main/java/org/thingsboard/server/controller/ValidationCallback.java rename to application/src/main/java/org/thingsboard/server/service/security/ValidationCallback.java index 6b2718f568..7feae03516 100644 --- a/application/src/main/java/org/thingsboard/server/controller/ValidationCallback.java +++ b/application/src/main/java/org/thingsboard/server/service/security/ValidationCallback.java @@ -1,23 +1,6 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.controller; +package org.thingsboard.server.service.security; import com.google.common.util.concurrent.FutureCallback; -import org.springframework.http.ResponseEntity; -import org.springframework.web.context.request.async.DeferredResult; import org.thingsboard.server.actors.plugin.ValidationResult; import org.thingsboard.server.actors.plugin.ValidationResultCode; import org.thingsboard.server.extensions.api.exception.AccessDeniedException; @@ -26,14 +9,14 @@ import org.thingsboard.server.extensions.api.exception.InternalErrorException; import org.thingsboard.server.extensions.api.exception.UnauthorizedException; /** - * Created by ashvayka on 21.02.17. + * Created by ashvayka on 31.03.18. */ -public class ValidationCallback implements FutureCallback { +public class ValidationCallback implements FutureCallback { - private final DeferredResult response; - private final FutureCallback> action; + private final T response; + private final FutureCallback action; - public ValidationCallback(DeferredResult response, FutureCallback> action) { + public ValidationCallback(T response, FutureCallback action) { this.response = response; this.action = action; } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 359949eef9..81cdac93ed 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -5,8 +5,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription; +import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState; import java.util.HashMap; import java.util.List; @@ -23,13 +25,10 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio @Autowired private TelemetryWebSocketService wsService; - private final Map> subscriptionsByEntityId = new HashMap<>(); private final Map> subscriptionsByWsSessionId = new HashMap<>(); - - @Override public void onAttributesUpdateFromServer(EntityId entityId, String scope, List attributes) { @@ -39,4 +38,29 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio public void onTimeseriesUpdateFromServer(EntityId entityId, List entries) { } + + @Override + public void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId) { + + } + + @Override + public void removeSubscription(String sessionId, int cmdId) { + + } + + @Override + public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) { + + } + + @Override + public void onLocalTimeseriesUpdate(EntityId entityId, Map> ts) { + + } + + @Override + public void onLocalAttributesUpdate(EntityId entityId, String scope, Set attributes) { + + } } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java index 6d6c33e65b..57dd93d79f 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java @@ -2,36 +2,46 @@ package org.thingsboard.server.service.telemetry; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.hazelcast.util.function.Consumer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; +import org.thingsboard.server.actors.plugin.ValidationResult; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; +import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseTsKvQuery; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.kv.TsKvQuery; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.extensions.api.exception.UnauthorizedException; -import org.thingsboard.server.extensions.api.plugins.PluginCallback; -import org.thingsboard.server.extensions.api.plugins.PluginContext; -import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef; import org.thingsboard.server.extensions.api.plugins.ws.SessionEvent; import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.AttributesSubscriptionCmd; +import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.GetHistoryCmd; import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.SubscriptionCmd; import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmd; import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TelemetryPluginCmdsWrapper; +import org.thingsboard.server.extensions.core.plugin.telemetry.cmd.TimeseriesSubscriptionCmd; import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionErrorCode; import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState; import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType; import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate; +import org.thingsboard.server.service.security.AccessValidator; +import javax.annotation.Nullable; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -41,6 +51,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; /** @@ -50,6 +62,8 @@ import java.util.stream.Collectors; @Slf4j public class DefaultTelemetryWebSocketService implements TelemetryWebSocketService { + public static final int DEFAULT_LIMIT = 100; + public static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE; private static final int UNKNOWN_SUBSCRIPTION_ID = 0; private static final String PROCESSING_MSG = "[{}] Processing: {}"; private static final ObjectMapper jsonMapper = new ObjectMapper(); @@ -65,12 +79,29 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @Autowired private TelemetryWebSocketMsgEndpoint msgEndpoint; + @Autowired + private AccessValidator accessValidator; + @Autowired private AttributesService attributesService; @Autowired private TimeseriesService tsService; + private ExecutorService executor; + + @PostConstruct + public void initExecutor() { + executor = Executors.newSingleThreadExecutor(); + } + + @PreDestroy + public void shutdownExecutor() { + if (executor != null) { + executor.shutdownNow(); + } + } + @Override public void handleWebSocketSessionEvent(TelemetryWebSocketSessionRef sessionRef, SessionEvent event) { String sessionId = sessionRef.getSessionId(); @@ -169,44 +200,190 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi }; if (StringUtils.isEmpty(cmd.getScope())) { - //ValidationCallback? - ctx.loadAttributes(entityId, Arrays.asList(DataConstants.allScopes()), keys, callback); + accessValidator.validate(sessionRef.getSecurityCtx(), entityId, getAttributesFetchCallback(entityId, keys, callback)); } else { - ctx.loadAttributes(entityId, cmd.getScope(), keys, callback); + accessValidator.validate(sessionRef.getSecurityCtx(), entityId, getAttributesFetchCallback(entityId, cmd.getScope(), keys, callback)); } } - private void handleWsAttributesSubscription(PluginContext ctx, PluginWebsocketSessionRef sessionRef, - AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId) { - PluginCallback> callback = new PluginCallback>() { + private void handleWsHistoryCmd(TelemetryWebSocketSessionRef sessionRef, GetHistoryCmd cmd) { + String sessionId = sessionRef.getSessionId(); + WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId); + if (sessionMD == null) { + log.warn("[{}] Session meta data not found. ", sessionId); + SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, + SESSION_META_DATA_NOT_FOUND); + sendWsMsg(sessionRef, update); + return; + } + if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty() || cmd.getEntityType() == null || cmd.getEntityType().isEmpty()) { + SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, + "Device id is empty!"); + sendWsMsg(sessionRef, update); + return; + } + if (cmd.getKeys() == null || cmd.getKeys().isEmpty()) { + SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, + "Keys are empty!"); + sendWsMsg(sessionRef, update); + return; + } + EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId()); + List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); + List queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))) + .collect(Collectors.toList()); + + FutureCallback> callback = new FutureCallback>() { @Override - public void onSuccess(PluginContext ctx, List data) { + public void onSuccess(List data) { + sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data)); + } + + @Override + public void onFailure(Throwable e) { + SubscriptionUpdate update; + if (UnauthorizedException.class.isInstance(e)) { + update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED, + SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg()); + } else { + update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, + FAILED_TO_FETCH_DATA); + } + sendWsMsg(sessionRef, update); + } + }; + accessValidator.validate(sessionRef.getSecurityCtx(), entityId, + on(r -> Futures.addCallback(tsService.findAll(entityId, queries), callback, executor), callback::onFailure)); + } + + private void handleWsAttributesSubscription(TelemetryWebSocketSessionRef sessionRef, + AttributesSubscriptionCmd cmd, String sessionId, EntityId entityId) { + FutureCallback> callback = new FutureCallback>() { + @Override + public void onSuccess(List data) { List attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); - sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData)); + sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData)); Map subState = new HashMap<>(attributesData.size()); attributesData.forEach(v -> subState.put(v.getKey(), v.getTs())); SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope()); - subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub); + subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub); } @Override - public void onFailure(PluginContext ctx, Exception e) { + public void onFailure(Throwable e) { log.error(FAILED_TO_FETCH_ATTRIBUTES, e); SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, FAILED_TO_FETCH_ATTRIBUTES); - sendWsMsg(ctx, sessionRef, update); + sendWsMsg(sessionRef, update); } }; + if (StringUtils.isEmpty(cmd.getScope())) { - ctx.loadAttributes(entityId, Arrays.asList(DataConstants.allScopes()), callback); + accessValidator.validate(sessionRef.getSecurityCtx(), entityId, getAttributesFetchCallback(entityId, callback)); } else { - ctx.loadAttributes(entityId, cmd.getScope(), callback); + accessValidator.validate(sessionRef.getSecurityCtx(), entityId, getAttributesFetchCallback(entityId, cmd.getScope(), callback)); } } + private void handleWsTimeseriesSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, TimeseriesSubscriptionCmd cmd) { + String sessionId = sessionRef.getSessionId(); + log.debug("[{}] Processing: {}", sessionId, cmd); + + if (validateSessionMetadata(sessionRef, cmd, sessionId)) { + if (cmd.isUnsubscribe()) { + unsubscribe(sessionRef, cmd, sessionId); + } else if (validateSubscriptionCmd(sessionRef, cmd)) { + EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId()); + Optional> keysOptional = getKeys(cmd); + + if (keysOptional.isPresent()) { + handleWsTimeseriesSubscriptionByKeys(sessionRef, cmd, sessionId, entityId); + } else { + handleWsTimeseriesSubscription(sessionRef, cmd, sessionId, entityId); + } + } + } + } + + private void handleWsTimeseriesSubscriptionByKeys(TelemetryWebSocketSessionRef sessionRef, + TimeseriesSubscriptionCmd cmd, String sessionId, EntityId entityId) { + long startTs; + if (cmd.getTimeWindow() > 0) { + List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); + log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId); + startTs = cmd.getStartTs(); + long endTs = cmd.getStartTs() + cmd.getTimeWindow(); + List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(), + getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList()); + + final FutureCallback> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys); + accessValidator.validate(sessionRef.getSecurityCtx(), entityId, + on(r -> Futures.addCallback(tsService.findAll(entityId, queries), callback, executor), callback::onFailure)); + } else { + List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); + startTs = System.currentTimeMillis(); + log.debug("[{}] fetching latest timeseries data for keys: ({}) for device : {}", sessionId, cmd.getKeys(), entityId); + final FutureCallback> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys); + accessValidator.validate(sessionRef.getSecurityCtx(), entityId, + on(r -> Futures.addCallback(tsService.findLatest(entityId, keys), callback, executor), callback::onFailure)); + } + } + + private void handleWsTimeseriesSubscription(TelemetryWebSocketSessionRef sessionRef, + TimeseriesSubscriptionCmd cmd, String sessionId, EntityId entityId) { + FutureCallback> callback = new FutureCallback>() { + @Override + public void onSuccess(List data) { + sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data)); + Map subState = new HashMap<>(data.size()); + data.forEach(v -> subState.put(v.getKey(), v.getTs())); + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState, cmd.getScope()); + subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub); + } + + @Override + public void onFailure(Throwable e) { + SubscriptionUpdate update; + if (UnauthorizedException.class.isInstance(e)) { + update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED, + SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg()); + } else { + update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, + FAILED_TO_FETCH_DATA); + } + sendWsMsg(sessionRef, update); + } + }; + accessValidator.validate(sessionRef.getSecurityCtx(), entityId, + on(r -> Futures.addCallback(tsService.findAllLatest(entityId), callback, executor), callback::onFailure)); + } + + private FutureCallback> getSubscriptionCallback(final TelemetryWebSocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd, final String sessionId, final EntityId entityId, final long startTs, final List keys) { + return new FutureCallback>() { + @Override + public void onSuccess(List data) { + sendWsMsg(sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data)); + + Map subState = new HashMap<>(keys.size()); + keys.forEach(key -> subState.put(key, startTs)); + data.forEach(v -> subState.put(v.getKey(), v.getTs())); + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState, cmd.getScope()); + subscriptionManager.addLocalWsSubscription(sessionId, entityId, sub); + } + + @Override + public void onFailure(Throwable e) { + log.error(FAILED_TO_FETCH_DATA, e); + SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, + FAILED_TO_FETCH_DATA); + sendWsMsg(sessionRef, update); + } + }; + } + private void unsubscribe(TelemetryWebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) { if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) { subscriptionManager.cleanupLocalWsSessionSubscriptions(sessionRef, sessionId); @@ -258,4 +435,105 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi } } + private ListenableFuture> mergeAllAttributesFutures(List>> futures) { + return Futures.transform(Futures.successfulAsList(futures), + (Function>, ? extends List>) input -> { + List tmp = new ArrayList<>(); + if (input != null) { + input.forEach(tmp::addAll); + } + return tmp; + }, executor); + } + + private FutureCallback getAttributesFetchCallback(final EntityId entityId, final List keys, final FutureCallback> callback) { + return new FutureCallback() { + @Override + public void onSuccess(@Nullable ValidationResult result) { + List>> futures = new ArrayList<>(); + for (String scope : DataConstants.allScopes()) { + futures.add(attributesService.find(entityId, scope, keys)); + } + + ListenableFuture> future = mergeAllAttributesFutures(futures); + Futures.addCallback(future, callback); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }; + } + + private FutureCallback getAttributesFetchCallback(final EntityId entityId, final String scope, final List keys, final FutureCallback> callback) { + return new FutureCallback() { + @Override + public void onSuccess(@Nullable ValidationResult result) { + Futures.addCallback(attributesService.find(entityId, scope, keys), callback); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }; + } + + private FutureCallback getAttributesFetchCallback(final EntityId entityId, final FutureCallback> callback) { + return new FutureCallback() { + @Override + public void onSuccess(@Nullable ValidationResult result) { + List>> futures = new ArrayList<>(); + for (String scope : DataConstants.allScopes()) { + futures.add(attributesService.findAll(entityId, scope)); + } + + ListenableFuture> future = mergeAllAttributesFutures(futures); + Futures.addCallback(future, callback); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }; + } + + private FutureCallback getAttributesFetchCallback(final EntityId entityId, final String scope, final FutureCallback> callback) { + return new FutureCallback() { + @Override + public void onSuccess(@Nullable ValidationResult result) { + Futures.addCallback(attributesService.findAll(entityId, scope), callback); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }; + } + + private FutureCallback on(Consumer success, Consumer failure) { + return new FutureCallback() { + @Override + public void onSuccess(@Nullable ValidationResult result) { + success.accept(result); + } + + @Override + public void onFailure(Throwable t) { + failure.accept(t); + } + }; + } + + + private static Aggregation getAggregation(String agg) { + return StringUtils.isEmpty(agg) ? DEFAULT_AGGREGATION : Aggregation.valueOf(agg); + } + + private int getLimit(int limit) { + return limit == 0 ? DEFAULT_LIMIT : limit; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java index 9673629e9d..44e8512559 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java @@ -2,10 +2,13 @@ package org.thingsboard.server.service.telemetry; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState; import java.util.List; +import java.util.Map; +import java.util.Set; /** * Created by ashvayka on 27.03.18. @@ -21,4 +24,8 @@ public interface TelemetrySubscriptionService { void removeSubscription(String sessionId, int cmdId); void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub); + + void onLocalTimeseriesUpdate(EntityId entityId, List ts); + + void onLocalAttributesUpdate(EntityId entityId, String scope, Set attributes); } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 46fa1d2087..6b8947037a 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,7 +15,12 @@ */ package org.thingsboard.rule.engine.api; +import com.google.common.util.concurrent.FutureCallback; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.dao.alarm.AlarmService; @@ -30,6 +35,8 @@ import org.thingsboard.server.dao.rule.RuleService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; @@ -56,6 +63,12 @@ public interface TbContext { void tellError(TbMsg msg, Throwable th); + void saveAndNotify(EntityId entityId, List ts, FutureCallback callback); + + void saveAndNotify(EntityId entityId, List ts, long ttl, FutureCallback callback); + + void saveAndNotify(EntityId entityId, String scope, Set attributes, FutureCallback callback); + RuleNodeId getSelfId(); AttributesService getAttributesService();