Major refactoring of plugin context
This commit is contained in:
parent
2accabf0b0
commit
301f106e54
@ -1,12 +1,12 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
* <p>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
@ -90,121 +90,102 @@ public final class PluginProcessingContext implements PluginContext {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void saveAttributes(DeviceId deviceId, String scope, List<AttributeKvEntry> attributes, PluginCallback<Void> callback) {
|
||||
validate(deviceId);
|
||||
|
||||
ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes);
|
||||
Futures.addCallback(rsListFuture, getListCallback(callback, v -> {
|
||||
onDeviceAttributesChanged(deviceId, scope, attributes);
|
||||
return null;
|
||||
}), executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeAttributes(DeviceId deviceId, String scope, List<String> keys, PluginCallback<Void> callback) {
|
||||
validate(deviceId);
|
||||
ListenableFuture<List<ResultSet>> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys);
|
||||
Futures.addCallback(future, getCallback(callback, v -> null), executor);
|
||||
onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void saveAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<AttributeKvEntry> attributes, PluginCallback<Void> callback) {
|
||||
validate(deviceId);
|
||||
|
||||
public void saveAttributes(final TenantId tenantId, final DeviceId deviceId, final String scope, final List<AttributeKvEntry> attributes, final PluginCallback<Void> callback) {
|
||||
validate(deviceId, new ValidationCallback(callback, ctx -> {
|
||||
ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes);
|
||||
Futures.addCallback(rsListFuture, getListCallback(callback, v -> {
|
||||
onDeviceAttributesChanged(tenantId, deviceId, scope, attributes);
|
||||
return null;
|
||||
}), executor);
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<String> keys, PluginCallback<Void> callback) {
|
||||
validate(deviceId);
|
||||
public void removeAttributes(final TenantId tenantId, final DeviceId deviceId, final String scope, final List<String> keys, final PluginCallback<Void> callback) {
|
||||
validate(deviceId, new ValidationCallback(callback, ctx -> {
|
||||
ListenableFuture<List<ResultSet>> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys);
|
||||
Futures.addCallback(future, getCallback(callback, v -> null), executor);
|
||||
onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, PluginCallback<Optional<AttributeKvEntry>> callback) {
|
||||
validate(deviceId);
|
||||
public void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, final PluginCallback<Optional<AttributeKvEntry>> callback) {
|
||||
validate(deviceId, new ValidationCallback(callback, ctx -> {
|
||||
ListenableFuture<Optional<AttributeKvEntry>> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey);
|
||||
Futures.addCallback(future, getCallback(callback, v -> v), executor);
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadAttributes(DeviceId deviceId, String attributeType, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback) {
|
||||
validate(deviceId);
|
||||
public void loadAttributes(DeviceId deviceId, String attributeType, Collection<String> attributeKeys, final PluginCallback<List<AttributeKvEntry>> callback) {
|
||||
validate(deviceId, new ValidationCallback(callback, ctx -> {
|
||||
ListenableFuture<List<AttributeKvEntry>> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys);
|
||||
Futures.addCallback(future, getCallback(callback, v -> v), executor);
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadAttributes(DeviceId deviceId, String attributeType, PluginCallback<List<AttributeKvEntry>> callback) {
|
||||
validate(deviceId);
|
||||
validate(deviceId, new ValidationCallback(callback, ctx -> {
|
||||
ListenableFuture<List<AttributeKvEntry>> future = pluginCtx.attributesService.findAll(deviceId, attributeType);
|
||||
Futures.addCallback(future, getCallback(callback, v -> v), executor);
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, PluginCallback<List<AttributeKvEntry>> callback) {
|
||||
validate(deviceId);
|
||||
public void loadAttributes(final DeviceId deviceId, final Collection<String> attributeTypes, final PluginCallback<List<AttributeKvEntry>> callback) {
|
||||
validate(deviceId, new ValidationCallback(callback, ctx -> {
|
||||
List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
|
||||
attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.findAll(deviceId, attributeType)));
|
||||
convertFuturesAndAddCallback(callback, futures);
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback) {
|
||||
validate(deviceId);
|
||||
public void loadAttributes(final DeviceId deviceId, final Collection<String> attributeTypes, final Collection<String> attributeKeys, final PluginCallback<List<AttributeKvEntry>> callback) {
|
||||
validate(deviceId, new ValidationCallback(callback, ctx -> {
|
||||
List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
|
||||
attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys)));
|
||||
convertFuturesAndAddCallback(callback, futures);
|
||||
}
|
||||
|
||||
private void convertFuturesAndAddCallback(PluginCallback<List<AttributeKvEntry>> callback, List<ListenableFuture<List<AttributeKvEntry>>> futures) {
|
||||
ListenableFuture<List<AttributeKvEntry>> future = Futures.transform(Futures.successfulAsList(futures),
|
||||
(Function<? super List<List<AttributeKvEntry>>, ? extends List<AttributeKvEntry>>) input -> {
|
||||
List<AttributeKvEntry> result = new ArrayList<>();
|
||||
input.forEach(r -> result.addAll(r));
|
||||
return result;
|
||||
}, executor);
|
||||
Futures.addCallback(future, getCallback(callback, v -> v), executor);
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void saveTsData(DeviceId deviceId, TsKvEntry entry, PluginCallback<Void> callback) {
|
||||
validate(deviceId);
|
||||
public void saveTsData(final DeviceId deviceId, final TsKvEntry entry, final PluginCallback<Void> callback) {
|
||||
validate(deviceId, new ValidationCallback(callback, ctx -> {
|
||||
ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(DataConstants.DEVICE, deviceId, entry);
|
||||
Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void saveTsData(DeviceId deviceId, List<TsKvEntry> entries, PluginCallback<Void> callback) {
|
||||
validate(deviceId);
|
||||
public void saveTsData(final DeviceId deviceId, final List<TsKvEntry> entries, final PluginCallback<Void> callback) {
|
||||
validate(deviceId, new ValidationCallback(callback, ctx -> {
|
||||
ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(DataConstants.DEVICE, deviceId, entries);
|
||||
Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadTimeseries(DeviceId deviceId, List<TsKvQuery> queries, PluginCallback<List<TsKvEntry>> callback) {
|
||||
validate(deviceId);
|
||||
public void loadTimeseries(final DeviceId deviceId, final List<TsKvQuery> queries, final PluginCallback<List<TsKvEntry>> callback) {
|
||||
validate(deviceId, new ValidationCallback(callback, ctx -> {
|
||||
ListenableFuture<List<TsKvEntry>> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, queries);
|
||||
Futures.addCallback(future, getCallback(callback, v -> v), executor);
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadLatestTimeseries(DeviceId deviceId, PluginCallback<List<TsKvEntry>> callback) {
|
||||
validate(deviceId);
|
||||
public void loadLatestTimeseries(final DeviceId deviceId, final PluginCallback<List<TsKvEntry>> callback) {
|
||||
validate(deviceId, new ValidationCallback(callback, ctx -> {
|
||||
ResultSetFuture future = pluginCtx.tsService.findAllLatest(DataConstants.DEVICE, deviceId);
|
||||
Futures.addCallback(future, getCallback(callback, pluginCtx.tsService::convertResultSetToTsKvEntryList), executor);
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadLatestTimeseries(DeviceId deviceId, Collection<String> keys, PluginCallback<List<TsKvEntry>> callback) {
|
||||
validate(deviceId);
|
||||
public void loadLatestTimeseries(final DeviceId deviceId, final Collection<String> keys, final PluginCallback<List<TsKvEntry>> callback) {
|
||||
validate(deviceId, new ValidationCallback(callback, ctx -> {
|
||||
ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.findLatest(DataConstants.DEVICE, deviceId, keys);
|
||||
Futures.addCallback(rsListFuture, getListCallback(callback, rsList ->
|
||||
{
|
||||
@ -217,6 +198,7 @@ public final class PluginProcessingContext implements PluginContext {
|
||||
}
|
||||
return result;
|
||||
}), executor);
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -224,15 +206,6 @@ public final class PluginProcessingContext implements PluginContext {
|
||||
pluginCtx.parentActor.tell(msg, ActorRef.noSender());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAccess(DeviceId deviceId) {
|
||||
try {
|
||||
return validate(deviceId);
|
||||
} catch (IllegalStateException | IllegalArgumentException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public PluginId getPluginId() {
|
||||
return pluginCtx.pluginId;
|
||||
@ -273,7 +246,11 @@ public final class PluginProcessingContext implements PluginContext {
|
||||
return new FutureCallback<R>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable R result) {
|
||||
try {
|
||||
pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, transformer.apply(result)), ActorRef.noSender());
|
||||
} catch (Exception e) {
|
||||
pluginCtx.self().tell(PluginCallbackMessage.onError(callback, e), ActorRef.noSender());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -287,28 +264,36 @@ public final class PluginProcessingContext implements PluginContext {
|
||||
};
|
||||
}
|
||||
|
||||
// TODO: replace with our own exceptions
|
||||
private boolean validate(DeviceId deviceId, PluginCallback<Device> callback) {
|
||||
@Override
|
||||
public void checkAccess(DeviceId deviceId, PluginCallback<Void> callback) {
|
||||
validate(deviceId, new ValidationCallback(callback, ctx -> callback.onSuccess(ctx, null)));
|
||||
}
|
||||
|
||||
private void validate(DeviceId deviceId, ValidationCallback callback) {
|
||||
if (securityCtx.isPresent()) {
|
||||
final PluginApiCallSecurityContext ctx = securityCtx.get();
|
||||
if (ctx.isTenantAdmin() || ctx.isCustomerUser()) {
|
||||
ListenableFuture<Device> device = pluginCtx.deviceService.findDeviceById(deviceId);
|
||||
Futures.addCallback(device, );
|
||||
ListenableFuture<Device> deviceFuture = pluginCtx.deviceService.findDeviceByIdAsync(deviceId);
|
||||
Futures.addCallback(deviceFuture, getCallback(callback, device -> {
|
||||
if (device == null) {
|
||||
throw new IllegalStateException("Device not found!");
|
||||
return Boolean.FALSE;
|
||||
} else {
|
||||
if (!device.getTenantId().equals(ctx.getTenantId())) {
|
||||
throw new IllegalArgumentException("Device belongs to different tenant!");
|
||||
return Boolean.FALSE;
|
||||
} else if (ctx.isCustomerUser() && !device.getCustomerId().equals(ctx.getCustomerId())) {
|
||||
throw new IllegalArgumentException("Device belongs to different customer!");
|
||||
return Boolean.FALSE;
|
||||
} else {
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
}
|
||||
}));
|
||||
} else {
|
||||
callback.onSuccess(this, Boolean.FALSE);
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
callback.onSuccess(this, Boolean.TRUE);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ServerAddress> resolve(DeviceId deviceId) {
|
||||
@ -338,4 +323,15 @@ public final class PluginProcessingContext implements PluginContext {
|
||||
public void scheduleTimeoutMsg(TimeoutMsg msg) {
|
||||
pluginCtx.scheduleTimeoutMsg(msg);
|
||||
}
|
||||
|
||||
|
||||
private void convertFuturesAndAddCallback(PluginCallback<List<AttributeKvEntry>> callback, List<ListenableFuture<List<AttributeKvEntry>>> futures) {
|
||||
ListenableFuture<List<AttributeKvEntry>> future = Futures.transform(Futures.successfulAsList(futures),
|
||||
(Function<? super List<List<AttributeKvEntry>>, ? extends List<AttributeKvEntry>>) input -> {
|
||||
List<AttributeKvEntry> result = new ArrayList<>();
|
||||
input.forEach(r -> result.addAll(r));
|
||||
return result;
|
||||
}, executor);
|
||||
Futures.addCallback(future, getCallback(callback, v -> v), executor);
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,49 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.actors.plugin;
|
||||
|
||||
import com.hazelcast.util.function.Consumer;
|
||||
import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
|
||||
import org.thingsboard.server.extensions.api.plugins.PluginCallback;
|
||||
import org.thingsboard.server.extensions.api.plugins.PluginContext;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 21.02.17.
|
||||
*/
|
||||
public class ValidationCallback implements PluginCallback<Boolean> {
|
||||
|
||||
private final PluginCallback<?> callback;
|
||||
private final Consumer<PluginContext> action;
|
||||
|
||||
public ValidationCallback(PluginCallback<?> callback, Consumer<PluginContext> action) {
|
||||
this.callback = callback;
|
||||
this.action = action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(PluginContext ctx, Boolean value) {
|
||||
if (value) {
|
||||
action.accept(ctx);
|
||||
} else {
|
||||
onFailure(ctx, new UnauthorizedException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(PluginContext ctx, Exception e) {
|
||||
callback.onFailure(ctx, e);
|
||||
}
|
||||
}
|
||||
@ -16,17 +16,22 @@
|
||||
package org.thingsboard.server.dao;
|
||||
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
import com.datastax.driver.core.ResultSetFuture;
|
||||
import com.datastax.driver.core.Statement;
|
||||
import com.datastax.driver.core.querybuilder.QueryBuilder;
|
||||
import com.datastax.driver.core.querybuilder.Select;
|
||||
import com.datastax.driver.core.utils.UUIDs;
|
||||
import com.datastax.driver.mapping.Mapper;
|
||||
import com.datastax.driver.mapping.Result;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.dao.model.BaseEntity;
|
||||
import org.thingsboard.server.dao.model.wrapper.EntityResultSet;
|
||||
import org.thingsboard.server.dao.model.ModelConstants;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
@ -72,6 +77,27 @@ public abstract class AbstractModelDao<T extends BaseEntity<?>> extends Abstract
|
||||
return object;
|
||||
}
|
||||
|
||||
protected ListenableFuture<T> findOneByStatementAsync(Statement statement) {
|
||||
if (statement != null) {
|
||||
statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
|
||||
ResultSetFuture resultSetFuture = getSession().executeAsync(statement);
|
||||
ListenableFuture<T> result = Futures.transform(resultSetFuture, new Function<ResultSet, T>() {
|
||||
@Nullable
|
||||
@Override
|
||||
public T apply(@Nullable ResultSet resultSet) {
|
||||
Result<T> result = getMapper().map(resultSet);
|
||||
if (result != null) {
|
||||
return result.one();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
});
|
||||
return result;
|
||||
}
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
|
||||
protected Statement getSaveQuery(T dto) {
|
||||
return getMapper().saveQuery(dto);
|
||||
}
|
||||
@ -100,6 +126,14 @@ public abstract class AbstractModelDao<T extends BaseEntity<?>> extends Abstract
|
||||
return findOneByStatement(query);
|
||||
}
|
||||
|
||||
public ListenableFuture<T> findByIdAsync(UUID key) {
|
||||
log.debug("Get entity by key {}", key);
|
||||
Select.Where query = select().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key));
|
||||
log.trace("Execute query {}", query);
|
||||
return findOneByStatementAsync(query);
|
||||
}
|
||||
|
||||
|
||||
public ResultSet removeById(UUID key) {
|
||||
Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key));
|
||||
log.debug("Remove request: {}", delete.toString());
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
package org.thingsboard.server.dao;
|
||||
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
@ -26,6 +27,8 @@ public interface Dao<T> {
|
||||
|
||||
T findById(UUID id);
|
||||
|
||||
ListenableFuture<T> findByIdAsync(UUID id);
|
||||
|
||||
T save(T t);
|
||||
|
||||
ResultSet removeById(UUID id);
|
||||
|
||||
@ -15,6 +15,9 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.device;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@ -70,6 +73,14 @@ public class DeviceServiceImpl implements DeviceService {
|
||||
return getData(deviceEntity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Device> findDeviceByIdAsync(DeviceId deviceId) {
|
||||
log.trace("Executing findDeviceById [{}]", deviceId);
|
||||
validateId(deviceId, "Incorrect deviceId " + deviceId);
|
||||
ListenableFuture<DeviceEntity> deviceEntity = deviceDao.findByIdAsync(deviceId.getId());
|
||||
return Futures.transform(deviceEntity, (Function<? super DeviceEntity, ? extends Device>) input -> getData(input));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Device> findDeviceByTenantIdAndName(TenantId tenantId, String name) {
|
||||
log.trace("Executing findDeviceByTenantIdAndName [{}][{}]", tenantId, name);
|
||||
|
||||
@ -0,0 +1,22 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.extensions.api.exception;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 21.02.17.
|
||||
*/
|
||||
public class UnauthorizedException extends Exception {
|
||||
}
|
||||
@ -42,7 +42,7 @@ public interface PluginContext {
|
||||
|
||||
void reply(PluginToRuleMsg<?> msg);
|
||||
|
||||
boolean checkAccess(DeviceId deviceId);
|
||||
void checkAccess(DeviceId deviceId, PluginCallback<Void> callback);
|
||||
|
||||
Optional<PluginApiCallSecurityContext> getSecurityCtx();
|
||||
|
||||
@ -92,13 +92,9 @@ public interface PluginContext {
|
||||
Attributes API
|
||||
*/
|
||||
|
||||
void saveAttributes(DeviceId deviceId, String attributeType, List<AttributeKvEntry> attributes, PluginCallback<Void> callback);
|
||||
void saveAttributes(TenantId tenantId, DeviceId deviceId, String attributeType, List<AttributeKvEntry> attributes, PluginCallback<Void> callback);
|
||||
|
||||
void removeAttributes(DeviceId deviceId, String scope, List<String> attributeKeys, PluginCallback<Void> callback);
|
||||
|
||||
void saveAttributesByDevice(TenantId tenantId, DeviceId deviceId, String attributeType, List<AttributeKvEntry> attributes, PluginCallback<Void> callback);
|
||||
|
||||
void removeAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<String> attributeKeys, PluginCallback<Void> callback);
|
||||
void removeAttributes(TenantId tenantId, DeviceId deviceId, String scope, List<String> attributeKeys, PluginCallback<Void> callback);
|
||||
|
||||
void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, PluginCallback<Optional<AttributeKvEntry>> callback);
|
||||
|
||||
|
||||
@ -25,6 +25,7 @@ import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.context.request.async.DeferredResult;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.extensions.api.plugins.PluginCallback;
|
||||
import org.thingsboard.server.extensions.api.plugins.PluginContext;
|
||||
import org.thingsboard.server.extensions.api.plugins.handlers.DefaultRestMsgHandler;
|
||||
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
|
||||
@ -62,14 +63,14 @@ public class RpcRestMsgHandler extends DefaultRestMsgHandler {
|
||||
String method = pathParams[0].toUpperCase();
|
||||
if (DataConstants.ONEWAY.equals(method) || DataConstants.TWOWAY.equals(method)) {
|
||||
DeviceId deviceId = DeviceId.fromString(pathParams[1]);
|
||||
if (!ctx.checkAccess(deviceId)) {
|
||||
msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
|
||||
return;
|
||||
}
|
||||
JsonNode rpcRequestBody = jsonMapper.readTree(request.getRequestBody());
|
||||
|
||||
RpcRequest cmd = new RpcRequest(rpcRequestBody.get("method").asText(),
|
||||
jsonMapper.writeValueAsString(rpcRequestBody.get("params")));
|
||||
|
||||
ctx.checkAccess(deviceId, new PluginCallback<Void>() {
|
||||
@Override
|
||||
public void onSuccess(PluginContext ctx, Void value) {
|
||||
if (rpcRequestBody.has("timeout")) {
|
||||
cmd.setTimeout(rpcRequestBody.get("timeout").asLong());
|
||||
}
|
||||
@ -83,6 +84,13 @@ public class RpcRestMsgHandler extends DefaultRestMsgHandler {
|
||||
body
|
||||
);
|
||||
rpcManager.process(ctx, new LocalRequestMetaData(rpcRequest, msg.getResponseHolder()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(PluginContext ctx, Exception e) {
|
||||
msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
|
||||
}
|
||||
});
|
||||
valid = true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -164,7 +164,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
|
||||
}
|
||||
});
|
||||
if (attributes.size() > 0) {
|
||||
ctx.saveAttributes(deviceId, scope, attributes, new PluginCallback<Void>() {
|
||||
ctx.saveAttributes(ctx.getSecurityCtx().orElseThrow(() -> new IllegalArgumentException()).getTenantId(), deviceId, scope, attributes, new PluginCallback<Void>() {
|
||||
@Override
|
||||
public void onSuccess(PluginContext ctx, Void value) {
|
||||
msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
|
||||
@ -182,8 +182,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.debug("Failed to process POST request due to IO exception", e);
|
||||
} catch (IOException | RuntimeException e) {
|
||||
log.debug("Failed to process POST request due to exception", e);
|
||||
}
|
||||
msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
|
||||
}
|
||||
@ -202,7 +202,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
|
||||
String keysParam = request.getParameter("keys");
|
||||
if (!StringUtils.isEmpty(keysParam)) {
|
||||
String[] keys = keysParam.split(",");
|
||||
ctx.removeAttributes(deviceId, scope, Arrays.asList(keys), new PluginCallback<Void>() {
|
||||
ctx.removeAttributes(ctx.getSecurityCtx().orElseThrow(() -> new IllegalArgumentException()).getTenantId(), deviceId, scope, Arrays.asList(keys), new PluginCallback<Void>() {
|
||||
@Override
|
||||
public void onSuccess(PluginContext ctx, Void value) {
|
||||
msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
|
||||
|
||||
@ -120,7 +120,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
|
||||
@Override
|
||||
public void handleUpdateAttributesRequest(PluginContext ctx, TenantId tenantId, RuleId ruleId, UpdateAttributesRequestRuleToPluginMsg msg) {
|
||||
UpdateAttributesRequest request = msg.getPayload();
|
||||
ctx.saveAttributes(msg.getDeviceId(), DataConstants.CLIENT_SCOPE, request.getAttributes().stream().collect(Collectors.toList()),
|
||||
ctx.saveAttributes(msg.getTenantId(), msg.getDeviceId(), DataConstants.CLIENT_SCOPE, request.getAttributes().stream().collect(Collectors.toList()),
|
||||
new PluginCallback<Void>() {
|
||||
@Override
|
||||
public void onSuccess(PluginContext ctx, Void value) {
|
||||
|
||||
@ -21,6 +21,7 @@ import org.springframework.util.StringUtils;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.kv.*;
|
||||
import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
|
||||
import org.thingsboard.server.extensions.api.plugins.PluginCallback;
|
||||
import org.thingsboard.server.extensions.api.plugins.PluginContext;
|
||||
import org.thingsboard.server.extensions.api.plugins.handlers.DefaultWebsocketMsgHandler;
|
||||
@ -122,8 +123,14 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
|
||||
@Override
|
||||
public void onFailure(PluginContext ctx, Exception e) {
|
||||
log.error("Failed to fetch attributes!", e);
|
||||
SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
|
||||
SubscriptionUpdate update;
|
||||
if (UnauthorizedException.class.isInstance(e)) {
|
||||
update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
|
||||
SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
|
||||
} else {
|
||||
update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
|
||||
"Failed to fetch attributes!");
|
||||
}
|
||||
sendWsMsg(ctx, sessionRef, update);
|
||||
}
|
||||
};
|
||||
@ -207,8 +214,14 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
|
||||
|
||||
@Override
|
||||
public void onFailure(PluginContext ctx, Exception e) {
|
||||
SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
|
||||
SubscriptionUpdate update;
|
||||
if (UnauthorizedException.class.isInstance(e)) {
|
||||
update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
|
||||
SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
|
||||
} else {
|
||||
update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
|
||||
"Failed to fetch data!");
|
||||
}
|
||||
sendWsMsg(ctx, sessionRef, update);
|
||||
}
|
||||
});
|
||||
@ -263,12 +276,6 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
|
||||
return;
|
||||
}
|
||||
DeviceId deviceId = DeviceId.fromString(cmd.getDeviceId());
|
||||
if (!ctx.checkAccess(deviceId)) {
|
||||
SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
|
||||
SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
|
||||
sendWsMsg(ctx, sessionRef, update);
|
||||
return;
|
||||
}
|
||||
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
|
||||
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList());
|
||||
ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
|
||||
@ -279,8 +286,15 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
|
||||
|
||||
@Override
|
||||
public void onFailure(PluginContext ctx, Exception e) {
|
||||
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
|
||||
"Failed to fetch data!"));
|
||||
SubscriptionUpdate update;
|
||||
if (UnauthorizedException.class.isInstance(e)) {
|
||||
update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
|
||||
SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
|
||||
} else {
|
||||
update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
|
||||
"Failed to fetch data!");
|
||||
}
|
||||
sendWsMsg(ctx, sessionRef, update);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -313,13 +327,6 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
|
||||
sendWsMsg(ctx, sessionRef, update);
|
||||
return false;
|
||||
}
|
||||
DeviceId deviceId = DeviceId.fromString(cmd.getDeviceId());
|
||||
if (!ctx.checkAccess(deviceId)) {
|
||||
SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
|
||||
SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
|
||||
sendWsMsg(ctx, sessionRef, update);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user