From 301f106e540213e6dcfc2514531f5ccdcc83a555 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Tue, 21 Feb 2017 19:29:00 +0200 Subject: [PATCH] Major refactoring of plugin context --- .../plugin/PluginProcessingContext.java | 236 +++++++++--------- .../actors/plugin/ValidationCallback.java | 49 ++++ .../server/dao/AbstractModelDao.java | 34 +++ .../java/org/thingsboard/server/dao/Dao.java | 3 + .../server/dao/device/DeviceServiceImpl.java | 11 + .../api/exception/UnauthorizedException.java | 22 ++ .../extensions/api/plugins/PluginContext.java | 10 +- .../rpc/handlers/RpcRestMsgHandler.java | 42 ++-- .../handlers/TelemetryRestMsgHandler.java | 8 +- .../handlers/TelemetryRuleMsgHandler.java | 2 +- .../TelemetryWebsocketMsgHandler.java | 45 ++-- 11 files changed, 294 insertions(+), 168 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java create mode 100644 extensions-api/src/main/java/org/thingsboard/server/extensions/api/exception/UnauthorizedException.java diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java index 98149c296a..b102226921 100644 --- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -90,133 +90,115 @@ public final class PluginProcessingContext implements PluginContext { } @Override - public void saveAttributes(DeviceId deviceId, String scope, List attributes, PluginCallback callback) { - validate(deviceId); - - ListenableFuture> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes); - Futures.addCallback(rsListFuture, getListCallback(callback, v -> { - onDeviceAttributesChanged(deviceId, scope, attributes); - return null; - }), executor); + public void saveAttributes(final TenantId tenantId, final DeviceId deviceId, final String scope, final List attributes, final PluginCallback callback) { + validate(deviceId, new ValidationCallback(callback, ctx -> { + ListenableFuture> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes); + Futures.addCallback(rsListFuture, getListCallback(callback, v -> { + onDeviceAttributesChanged(tenantId, deviceId, scope, attributes); + return null; + }), executor); + })); } @Override - public void removeAttributes(DeviceId deviceId, String scope, List keys, PluginCallback callback) { - validate(deviceId); - ListenableFuture> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys); - Futures.addCallback(future, getCallback(callback, v -> null), executor); - onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet())); + public void removeAttributes(final TenantId tenantId, final DeviceId deviceId, final String scope, final List keys, final PluginCallback callback) { + validate(deviceId, new ValidationCallback(callback, ctx -> { + ListenableFuture> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys); + Futures.addCallback(future, getCallback(callback, v -> null), executor); + onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet())); + })); } @Override - public void saveAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List attributes, PluginCallback callback) { - validate(deviceId); - - ListenableFuture> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes); - Futures.addCallback(rsListFuture, getListCallback(callback, v -> { - onDeviceAttributesChanged(tenantId, deviceId, scope, attributes); - return null; - }), executor); + public void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, final PluginCallback> callback) { + validate(deviceId, new ValidationCallback(callback, ctx -> { + ListenableFuture> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey); + Futures.addCallback(future, getCallback(callback, v -> v), executor); + })); } @Override - public void removeAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List keys, PluginCallback callback) { - validate(deviceId); - ListenableFuture> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys); - Futures.addCallback(future, getCallback(callback, v -> null), executor); - onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet())); - } - - @Override - public void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, PluginCallback> callback) { - validate(deviceId); - ListenableFuture> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey); - Futures.addCallback(future, getCallback(callback, v -> v), executor); - } - - @Override - public void loadAttributes(DeviceId deviceId, String attributeType, Collection attributeKeys, PluginCallback> callback) { - validate(deviceId); - ListenableFuture> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys); - Futures.addCallback(future, getCallback(callback, v -> v), executor); + public void loadAttributes(DeviceId deviceId, String attributeType, Collection attributeKeys, final PluginCallback> callback) { + validate(deviceId, new ValidationCallback(callback, ctx -> { + ListenableFuture> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys); + Futures.addCallback(future, getCallback(callback, v -> v), executor); + })); } @Override public void loadAttributes(DeviceId deviceId, String attributeType, PluginCallback> callback) { - validate(deviceId); - ListenableFuture> future = pluginCtx.attributesService.findAll(deviceId, attributeType); - Futures.addCallback(future, getCallback(callback, v -> v), executor); + validate(deviceId, new ValidationCallback(callback, ctx -> { + ListenableFuture> future = pluginCtx.attributesService.findAll(deviceId, attributeType); + Futures.addCallback(future, getCallback(callback, v -> v), executor); + })); } @Override - public void loadAttributes(DeviceId deviceId, Collection attributeTypes, PluginCallback> callback) { - validate(deviceId); - List>> futures = new ArrayList<>(); - attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.findAll(deviceId, attributeType))); - convertFuturesAndAddCallback(callback, futures); + public void loadAttributes(final DeviceId deviceId, final Collection attributeTypes, final PluginCallback> callback) { + validate(deviceId, new ValidationCallback(callback, ctx -> { + List>> futures = new ArrayList<>(); + attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.findAll(deviceId, attributeType))); + convertFuturesAndAddCallback(callback, futures); + })); } @Override - public void loadAttributes(DeviceId deviceId, Collection attributeTypes, Collection attributeKeys, PluginCallback> callback) { - validate(deviceId); - List>> futures = new ArrayList<>(); - attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys))); - convertFuturesAndAddCallback(callback, futures); - } - - private void convertFuturesAndAddCallback(PluginCallback> callback, List>> futures) { - ListenableFuture> future = Futures.transform(Futures.successfulAsList(futures), - (Function>, ? extends List>) input -> { - List result = new ArrayList<>(); - input.forEach(r -> result.addAll(r)); - return result; - }, executor); - Futures.addCallback(future, getCallback(callback, v -> v), executor); + public void loadAttributes(final DeviceId deviceId, final Collection attributeTypes, final Collection attributeKeys, final PluginCallback> callback) { + validate(deviceId, new ValidationCallback(callback, ctx -> { + List>> futures = new ArrayList<>(); + attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys))); + convertFuturesAndAddCallback(callback, futures); + })); } @Override - public void saveTsData(DeviceId deviceId, TsKvEntry entry, PluginCallback callback) { - validate(deviceId); - ListenableFuture> rsListFuture = pluginCtx.tsService.save(DataConstants.DEVICE, deviceId, entry); - Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor); + public void saveTsData(final DeviceId deviceId, final TsKvEntry entry, final PluginCallback callback) { + validate(deviceId, new ValidationCallback(callback, ctx -> { + ListenableFuture> rsListFuture = pluginCtx.tsService.save(DataConstants.DEVICE, deviceId, entry); + Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor); + })); } @Override - public void saveTsData(DeviceId deviceId, List entries, PluginCallback callback) { - validate(deviceId); - ListenableFuture> rsListFuture = pluginCtx.tsService.save(DataConstants.DEVICE, deviceId, entries); - Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor); + public void saveTsData(final DeviceId deviceId, final List entries, final PluginCallback callback) { + validate(deviceId, new ValidationCallback(callback, ctx -> { + ListenableFuture> rsListFuture = pluginCtx.tsService.save(DataConstants.DEVICE, deviceId, entries); + Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor); + })); } @Override - public void loadTimeseries(DeviceId deviceId, List queries, PluginCallback> callback) { - validate(deviceId); - ListenableFuture> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, queries); - Futures.addCallback(future, getCallback(callback, v -> v), executor); + public void loadTimeseries(final DeviceId deviceId, final List queries, final PluginCallback> callback) { + validate(deviceId, new ValidationCallback(callback, ctx -> { + ListenableFuture> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, queries); + Futures.addCallback(future, getCallback(callback, v -> v), executor); + })); } @Override - public void loadLatestTimeseries(DeviceId deviceId, PluginCallback> callback) { - validate(deviceId); - ResultSetFuture future = pluginCtx.tsService.findAllLatest(DataConstants.DEVICE, deviceId); - Futures.addCallback(future, getCallback(callback, pluginCtx.tsService::convertResultSetToTsKvEntryList), executor); + public void loadLatestTimeseries(final DeviceId deviceId, final PluginCallback> callback) { + validate(deviceId, new ValidationCallback(callback, ctx -> { + ResultSetFuture future = pluginCtx.tsService.findAllLatest(DataConstants.DEVICE, deviceId); + Futures.addCallback(future, getCallback(callback, pluginCtx.tsService::convertResultSetToTsKvEntryList), executor); + })); } @Override - public void loadLatestTimeseries(DeviceId deviceId, Collection keys, PluginCallback> callback) { - validate(deviceId); - ListenableFuture> rsListFuture = pluginCtx.tsService.findLatest(DataConstants.DEVICE, deviceId, keys); - Futures.addCallback(rsListFuture, getListCallback(callback, rsList -> - { - List result = new ArrayList<>(); - for (ResultSet rs : rsList) { - Row row = rs.one(); - if (row != null) { - result.add(pluginCtx.tsService.convertResultToTsKvEntry(row)); + public void loadLatestTimeseries(final DeviceId deviceId, final Collection keys, final PluginCallback> callback) { + validate(deviceId, new ValidationCallback(callback, ctx -> { + ListenableFuture> rsListFuture = pluginCtx.tsService.findLatest(DataConstants.DEVICE, deviceId, keys); + Futures.addCallback(rsListFuture, getListCallback(callback, rsList -> + { + List result = new ArrayList<>(); + for (ResultSet rs : rsList) { + Row row = rs.one(); + if (row != null) { + result.add(pluginCtx.tsService.convertResultToTsKvEntry(row)); + } } - } - return result; - }), executor); + return result; + }), executor); + })); } @Override @@ -224,15 +206,6 @@ public final class PluginProcessingContext implements PluginContext { pluginCtx.parentActor.tell(msg, ActorRef.noSender()); } - @Override - public boolean checkAccess(DeviceId deviceId) { - try { - return validate(deviceId); - } catch (IllegalStateException | IllegalArgumentException e) { - return false; - } - } - @Override public PluginId getPluginId() { return pluginCtx.pluginId; @@ -273,7 +246,11 @@ public final class PluginProcessingContext implements PluginContext { return new FutureCallback() { @Override public void onSuccess(@Nullable R result) { - pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, transformer.apply(result)), ActorRef.noSender()); + try { + pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, transformer.apply(result)), ActorRef.noSender()); + } catch (Exception e) { + pluginCtx.self().tell(PluginCallbackMessage.onError(callback, e), ActorRef.noSender()); + } } @Override @@ -287,27 +264,35 @@ public final class PluginProcessingContext implements PluginContext { }; } - // TODO: replace with our own exceptions - private boolean validate(DeviceId deviceId, PluginCallback callback) { + @Override + public void checkAccess(DeviceId deviceId, PluginCallback callback) { + validate(deviceId, new ValidationCallback(callback, ctx -> callback.onSuccess(ctx, null))); + } + + private void validate(DeviceId deviceId, ValidationCallback callback) { if (securityCtx.isPresent()) { final PluginApiCallSecurityContext ctx = securityCtx.get(); if (ctx.isTenantAdmin() || ctx.isCustomerUser()) { - ListenableFuture device = pluginCtx.deviceService.findDeviceById(deviceId); - Futures.addCallback(device, ); - if (device == null) { - throw new IllegalStateException("Device not found!"); - } else { - if (!device.getTenantId().equals(ctx.getTenantId())) { - throw new IllegalArgumentException("Device belongs to different tenant!"); - } else if (ctx.isCustomerUser() && !device.getCustomerId().equals(ctx.getCustomerId())) { - throw new IllegalArgumentException("Device belongs to different customer!"); + ListenableFuture deviceFuture = pluginCtx.deviceService.findDeviceByIdAsync(deviceId); + Futures.addCallback(deviceFuture, getCallback(callback, device -> { + if (device == null) { + return Boolean.FALSE; + } else { + if (!device.getTenantId().equals(ctx.getTenantId())) { + return Boolean.FALSE; + } else if (ctx.isCustomerUser() && !device.getCustomerId().equals(ctx.getCustomerId())) { + return Boolean.FALSE; + } else { + return Boolean.TRUE; + } } - } + })); } else { - return false; + callback.onSuccess(this, Boolean.FALSE); } + } else { + callback.onSuccess(this, Boolean.TRUE); } - return true; } @Override @@ -338,4 +323,15 @@ public final class PluginProcessingContext implements PluginContext { public void scheduleTimeoutMsg(TimeoutMsg msg) { pluginCtx.scheduleTimeoutMsg(msg); } + + + private void convertFuturesAndAddCallback(PluginCallback> callback, List>> futures) { + ListenableFuture> future = Futures.transform(Futures.successfulAsList(futures), + (Function>, ? extends List>) input -> { + List result = new ArrayList<>(); + input.forEach(r -> result.addAll(r)); + return result; + }, executor); + Futures.addCallback(future, getCallback(callback, v -> v), executor); + } } diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java b/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java new file mode 100644 index 0000000000..707afa50dd --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java @@ -0,0 +1,49 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.plugin; + +import com.hazelcast.util.function.Consumer; +import org.thingsboard.server.extensions.api.exception.UnauthorizedException; +import org.thingsboard.server.extensions.api.plugins.PluginCallback; +import org.thingsboard.server.extensions.api.plugins.PluginContext; + +/** + * Created by ashvayka on 21.02.17. + */ +public class ValidationCallback implements PluginCallback { + + private final PluginCallback callback; + private final Consumer action; + + public ValidationCallback(PluginCallback callback, Consumer action) { + this.callback = callback; + this.action = action; + } + + @Override + public void onSuccess(PluginContext ctx, Boolean value) { + if (value) { + action.accept(ctx); + } else { + onFailure(ctx, new UnauthorizedException()); + } + } + + @Override + public void onFailure(PluginContext ctx, Exception e) { + callback.onFailure(ctx, e); + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java b/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java index 73a553767c..60c833bf30 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java @@ -16,17 +16,22 @@ package org.thingsboard.server.dao; import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; import com.datastax.driver.core.utils.UUIDs; import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.Result; +import com.google.common.base.Function; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.dao.model.BaseEntity; import org.thingsboard.server.dao.model.wrapper.EntityResultSet; import org.thingsboard.server.dao.model.ModelConstants; +import javax.annotation.Nullable; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -72,6 +77,27 @@ public abstract class AbstractModelDao> extends Abstract return object; } + protected ListenableFuture findOneByStatementAsync(Statement statement) { + if (statement != null) { + statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); + ResultSetFuture resultSetFuture = getSession().executeAsync(statement); + ListenableFuture result = Futures.transform(resultSetFuture, new Function() { + @Nullable + @Override + public T apply(@Nullable ResultSet resultSet) { + Result result = getMapper().map(resultSet); + if (result != null) { + return result.one(); + } else { + return null; + } + } + }); + return result; + } + return Futures.immediateFuture(null); + } + protected Statement getSaveQuery(T dto) { return getMapper().saveQuery(dto); } @@ -100,6 +126,14 @@ public abstract class AbstractModelDao> extends Abstract return findOneByStatement(query); } + public ListenableFuture findByIdAsync(UUID key) { + log.debug("Get entity by key {}", key); + Select.Where query = select().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key)); + log.trace("Execute query {}", query); + return findOneByStatementAsync(query); + } + + public ResultSet removeById(UUID key) { Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key)); log.debug("Remove request: {}", delete.toString()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/Dao.java b/dao/src/main/java/org/thingsboard/server/dao/Dao.java index 7aa35e7c0e..2703cdc23a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/Dao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/Dao.java @@ -16,6 +16,7 @@ package org.thingsboard.server.dao; import com.datastax.driver.core.ResultSet; +import com.google.common.util.concurrent.ListenableFuture; import java.util.List; import java.util.UUID; @@ -26,6 +27,8 @@ public interface Dao { T findById(UUID id); + ListenableFuture findByIdAsync(UUID id); + T save(T t); ResultSet removeById(UUID id); diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java index 681188e6be..214d37c330 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java @@ -15,6 +15,9 @@ */ package org.thingsboard.server.dao.device; +import com.google.common.base.Function; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -70,6 +73,14 @@ public class DeviceServiceImpl implements DeviceService { return getData(deviceEntity); } + @Override + public ListenableFuture findDeviceByIdAsync(DeviceId deviceId) { + log.trace("Executing findDeviceById [{}]", deviceId); + validateId(deviceId, "Incorrect deviceId " + deviceId); + ListenableFuture deviceEntity = deviceDao.findByIdAsync(deviceId.getId()); + return Futures.transform(deviceEntity, (Function) input -> getData(input)); + } + @Override public Optional findDeviceByTenantIdAndName(TenantId tenantId, String name) { log.trace("Executing findDeviceByTenantIdAndName [{}][{}]", tenantId, name); diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/exception/UnauthorizedException.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/exception/UnauthorizedException.java new file mode 100644 index 0000000000..7b7d0ec811 --- /dev/null +++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/exception/UnauthorizedException.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.api.exception; + +/** + * Created by ashvayka on 21.02.17. + */ +public class UnauthorizedException extends Exception { +} diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java index 64d4d535ea..d25c8db60c 100644 --- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java +++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java @@ -42,7 +42,7 @@ public interface PluginContext { void reply(PluginToRuleMsg msg); - boolean checkAccess(DeviceId deviceId); + void checkAccess(DeviceId deviceId, PluginCallback callback); Optional getSecurityCtx(); @@ -92,13 +92,9 @@ public interface PluginContext { Attributes API */ - void saveAttributes(DeviceId deviceId, String attributeType, List attributes, PluginCallback callback); + void saveAttributes(TenantId tenantId, DeviceId deviceId, String attributeType, List attributes, PluginCallback callback); - void removeAttributes(DeviceId deviceId, String scope, List attributeKeys, PluginCallback callback); - - void saveAttributesByDevice(TenantId tenantId, DeviceId deviceId, String attributeType, List attributes, PluginCallback callback); - - void removeAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List attributeKeys, PluginCallback callback); + void removeAttributes(TenantId tenantId, DeviceId deviceId, String scope, List attributeKeys, PluginCallback callback); void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, PluginCallback> callback); diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/handlers/RpcRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/handlers/RpcRestMsgHandler.java index 4601916fb7..f60f2d815d 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/handlers/RpcRestMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/handlers/RpcRestMsgHandler.java @@ -25,6 +25,7 @@ import org.springframework.util.StringUtils; import org.springframework.web.context.request.async.DeferredResult; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.extensions.api.plugins.PluginCallback; import org.thingsboard.server.extensions.api.plugins.PluginContext; import org.thingsboard.server.extensions.api.plugins.handlers.DefaultRestMsgHandler; import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse; @@ -62,27 +63,34 @@ public class RpcRestMsgHandler extends DefaultRestMsgHandler { String method = pathParams[0].toUpperCase(); if (DataConstants.ONEWAY.equals(method) || DataConstants.TWOWAY.equals(method)) { DeviceId deviceId = DeviceId.fromString(pathParams[1]); - if (!ctx.checkAccess(deviceId)) { - msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); - return; - } JsonNode rpcRequestBody = jsonMapper.readTree(request.getRequestBody()); RpcRequest cmd = new RpcRequest(rpcRequestBody.get("method").asText(), jsonMapper.writeValueAsString(rpcRequestBody.get("params"))); - if (rpcRequestBody.has("timeout")) { - cmd.setTimeout(rpcRequestBody.get("timeout").asLong()); - } - long timeout = cmd.getTimeout() != null ? cmd.getTimeout() : defaultTimeout; - ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(cmd.getMethodName(), cmd.getRequestData()); - ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(UUID.randomUUID(), - ctx.getSecurityCtx().orElseThrow(() -> new IllegalStateException("Security context is empty!")).getTenantId(), - deviceId, - DataConstants.ONEWAY.equals(method), - System.currentTimeMillis() + timeout, - body - ); - rpcManager.process(ctx, new LocalRequestMetaData(rpcRequest, msg.getResponseHolder())); + + ctx.checkAccess(deviceId, new PluginCallback() { + @Override + public void onSuccess(PluginContext ctx, Void value) { + if (rpcRequestBody.has("timeout")) { + cmd.setTimeout(rpcRequestBody.get("timeout").asLong()); + } + long timeout = cmd.getTimeout() != null ? cmd.getTimeout() : defaultTimeout; + ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(cmd.getMethodName(), cmd.getRequestData()); + ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(UUID.randomUUID(), + ctx.getSecurityCtx().orElseThrow(() -> new IllegalStateException("Security context is empty!")).getTenantId(), + deviceId, + DataConstants.ONEWAY.equals(method), + System.currentTimeMillis() + timeout, + body + ); + rpcManager.process(ctx, new LocalRequestMetaData(rpcRequest, msg.getResponseHolder())); + } + + @Override + public void onFailure(PluginContext ctx, Exception e) { + msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); + } + }); valid = true; } } diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java index 28d8b7c5ce..2bd17aab87 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java @@ -164,7 +164,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { } }); if (attributes.size() > 0) { - ctx.saveAttributes(deviceId, scope, attributes, new PluginCallback() { + ctx.saveAttributes(ctx.getSecurityCtx().orElseThrow(() -> new IllegalArgumentException()).getTenantId(), deviceId, scope, attributes, new PluginCallback() { @Override public void onSuccess(PluginContext ctx, Void value) { msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK)); @@ -182,8 +182,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { } } } - } catch (IOException e) { - log.debug("Failed to process POST request due to IO exception", e); + } catch (IOException | RuntimeException e) { + log.debug("Failed to process POST request due to exception", e); } msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); } @@ -202,7 +202,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { String keysParam = request.getParameter("keys"); if (!StringUtils.isEmpty(keysParam)) { String[] keys = keysParam.split(","); - ctx.removeAttributes(deviceId, scope, Arrays.asList(keys), new PluginCallback() { + ctx.removeAttributes(ctx.getSecurityCtx().orElseThrow(() -> new IllegalArgumentException()).getTenantId(), deviceId, scope, Arrays.asList(keys), new PluginCallback() { @Override public void onSuccess(PluginContext ctx, Void value) { msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK)); diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java index d9bfba073e..1ce797fe4b 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java @@ -120,7 +120,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler { @Override public void handleUpdateAttributesRequest(PluginContext ctx, TenantId tenantId, RuleId ruleId, UpdateAttributesRequestRuleToPluginMsg msg) { UpdateAttributesRequest request = msg.getPayload(); - ctx.saveAttributes(msg.getDeviceId(), DataConstants.CLIENT_SCOPE, request.getAttributes().stream().collect(Collectors.toList()), + ctx.saveAttributes(msg.getTenantId(), msg.getDeviceId(), DataConstants.CLIENT_SCOPE, request.getAttributes().stream().collect(Collectors.toList()), new PluginCallback() { @Override public void onSuccess(PluginContext ctx, Void value) { diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java index dd30f99e9c..fbfacd3983 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java @@ -21,6 +21,7 @@ import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.kv.*; +import org.thingsboard.server.extensions.api.exception.UnauthorizedException; import org.thingsboard.server.extensions.api.plugins.PluginCallback; import org.thingsboard.server.extensions.api.plugins.PluginContext; import org.thingsboard.server.extensions.api.plugins.handlers.DefaultWebsocketMsgHandler; @@ -122,8 +123,14 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { @Override public void onFailure(PluginContext ctx, Exception e) { log.error("Failed to fetch attributes!", e); - SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, - "Failed to fetch attributes!"); + SubscriptionUpdate update; + if (UnauthorizedException.class.isInstance(e)) { + update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED, + SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg()); + } else { + update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, + "Failed to fetch attributes!"); + } sendWsMsg(ctx, sessionRef, update); } }; @@ -207,8 +214,14 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { @Override public void onFailure(PluginContext ctx, Exception e) { - SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, - "Failed to fetch data!"); + SubscriptionUpdate update; + if (UnauthorizedException.class.isInstance(e)) { + update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED, + SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg()); + } else { + update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, + "Failed to fetch data!"); + } sendWsMsg(ctx, sessionRef, update); } }); @@ -263,12 +276,6 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { return; } DeviceId deviceId = DeviceId.fromString(cmd.getDeviceId()); - if (!ctx.checkAccess(deviceId)) { - SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED, - SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg()); - sendWsMsg(ctx, sessionRef, update); - return; - } List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); List queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList()); ctx.loadTimeseries(deviceId, queries, new PluginCallback>() { @@ -279,8 +286,15 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { @Override public void onFailure(PluginContext ctx, Exception e) { - sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, - "Failed to fetch data!")); + SubscriptionUpdate update; + if (UnauthorizedException.class.isInstance(e)) { + update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED, + SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg()); + } else { + update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, + "Failed to fetch data!"); + } + sendWsMsg(ctx, sessionRef, update); } }); } @@ -313,13 +327,6 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { sendWsMsg(ctx, sessionRef, update); return false; } - DeviceId deviceId = DeviceId.fromString(cmd.getDeviceId()); - if (!ctx.checkAccess(deviceId)) { - SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED, - SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg()); - sendWsMsg(ctx, sessionRef, update); - return false; - } return true; }