Merge pull request #296 from thingsboard/feature/rpc-rule

RPC rule feature
This commit is contained in:
Andrew Shvayka 2017-09-18 12:08:19 +03:00 committed by GitHub
commit 1f39d20ab7
14 changed files with 414 additions and 6 deletions

View File

@ -44,6 +44,7 @@ import org.thingsboard.server.dao.customer.CustomerService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.event.EventService;
import org.thingsboard.server.dao.plugin.PluginService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.rule.RuleService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
@ -109,6 +110,9 @@ public class ActorSystemContext {
@Autowired
@Getter private AlarmService alarmService;
@Autowired
@Getter private RelationService relationService;
@Autowired
@Getter @Setter private PluginWebSocketMsgEndpoint wsMsgEndpoint;

View File

@ -33,6 +33,8 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvQuery;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.plugin.PluginMetaData;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.RuleMetaData;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
@ -394,6 +396,16 @@ public final class PluginProcessingContext implements PluginContext {
}
}
@Override
public ListenableFuture<List<EntityRelation>> findByFromAndType(EntityId from, String relationType) {
return this.pluginCtx.relationService.findByFromAndType(from, relationType, RelationTypeGroup.COMMON);
}
@Override
public ListenableFuture<List<EntityRelation>> findByToAndType(EntityId from, String relationType) {
return this.pluginCtx.relationService.findByToAndType(from, relationType, RelationTypeGroup.COMMON);
}
@Override
public Optional<ServerAddress> resolve(EntityId entityId) {
return pluginCtx.routingService.resolveById(entityId);

View File

@ -30,6 +30,7 @@ import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.customer.CustomerService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.plugin.PluginService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.rule.RuleService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
@ -61,6 +62,7 @@ public final class SharedPluginProcessingContext {
final AttributesService attributesService;
final ClusterRpcService rpcService;
final ClusterRoutingService routingService;
final RelationService relationService;
final PluginId pluginId;
final TenantId tenantId;
@ -83,6 +85,7 @@ public final class SharedPluginProcessingContext {
this.pluginService = sysContext.getPluginService();
this.customerService = sysContext.getCustomerService();
this.tenantService = sysContext.getTenantService();
this.relationService = sysContext.getRelationService();
}
public PluginId getPluginId() {

View File

@ -7,13 +7,13 @@
</encoder>
</appender>
<logger name="org.thingsboard.server" level="INFO"/>
<logger name="org.thingsboard.server" level="WARN"/>
<logger name="org.springframework" level="WARN"/>
<logger name="org.springframework.boot.test" level="DEBUG"/>
<logger name="org.apache.cassandra" level="WARN"/>
<logger name="org.cassandraunit" level="INFO"/>
<logger name="akka" level="DEBUG" />
<logger name="akka" level="INFO" />
<root level="WARN">
<appender-ref ref="console"/>

View File

@ -9,8 +9,8 @@
<logger name="org.thingsboard.server.dao" level="WARN"/>
<logger name="org.apache.cassandra" level="WARN"/>
<logger name="org.cassandraunit" level="INFO" />
<logger name="org.apache.cassandra" level="INFO" />
<logger name="org.cassandraunit" level="WARN" />
<logger name="org.apache.cassandra" level="WARN" />
<root level="WARN">
<appender-ref ref="console"/>

View File

@ -15,11 +15,14 @@
*/
package org.thingsboard.server.extensions.api.plugins;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvQuery;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.extensions.api.plugins.msg.PluginToRuleMsg;
import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
@ -109,4 +112,12 @@ public interface PluginContext {
void getCustomerDevices(TenantId tenantId, CustomerId customerId, int limit, PluginCallback<List<Device>> callback);
/*
* Relations API
* */
ListenableFuture<List<EntityRelation>> findByFromAndType(EntityId from, String relationType);
ListenableFuture<List<EntityRelation>> findByToAndType(EntityId from, String relationType);
}

View File

@ -0,0 +1,106 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.core.action.rpc;
import lombok.extern.slf4j.Slf4j;
import org.apache.velocity.Template;
import org.apache.velocity.VelocityContext;
import org.apache.velocity.runtime.parser.ParseException;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
import org.thingsboard.server.extensions.api.component.Action;
import org.thingsboard.server.extensions.api.plugins.PluginAction;
import org.thingsboard.server.extensions.api.plugins.msg.PluginToRuleMsg;
import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg;
import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
import org.thingsboard.server.extensions.api.rules.RuleContext;
import org.thingsboard.server.extensions.api.rules.RuleProcessingMetaData;
import org.thingsboard.server.extensions.api.rules.SimpleRuleLifecycleComponent;
import org.thingsboard.server.extensions.core.utils.VelocityUtils;
import java.util.Optional;
/**
* Created by ashvayka on 14.09.17.
*/
@Action(name = "Server Side RPC Call Action", descriptor = "ServerSideRpcCallActionDescriptor.json", configuration = ServerSideRpcCallActionConfiguration.class)
@Slf4j
public class ServerSideRpcCallAction extends SimpleRuleLifecycleComponent implements PluginAction<ServerSideRpcCallActionConfiguration> {
private ServerSideRpcCallActionConfiguration configuration;
private Optional<Template> deviceIdTemplate;
private Optional<Template> fromDeviceRelationTemplate;
private Optional<Template> toDeviceRelationTemplate;
private Optional<Template> rpcCallMethodTemplate;
private Optional<Template> rpcCallBodyTemplate;
@Override
public void init(ServerSideRpcCallActionConfiguration configuration) {
this.configuration = configuration;
try {
deviceIdTemplate = toTemplate(configuration.getDeviceIdTemplate(), "Device Id Template");
fromDeviceRelationTemplate = toTemplate(configuration.getFromDeviceRelationTemplate(), "From Device Relation Template");
toDeviceRelationTemplate = toTemplate(configuration.getToDeviceRelationTemplate(), "To Device Relation Template");
rpcCallMethodTemplate = toTemplate(configuration.getRpcCallMethodTemplate(), "RPC Call Method Template");
rpcCallBodyTemplate = toTemplate(configuration.getRpcCallBodyTemplate(), "RPC Call Body Template");
} catch (ParseException e) {
log.error("Failed to create templates based on provided configuration!", e);
throw new RuntimeException("Failed to create templates based on provided configuration!", e);
}
}
@Override
public Optional<RuleToPluginMsg<?>> convert(RuleContext ctx, ToDeviceActorMsg toDeviceActorMsg, RuleProcessingMetaData metadata) {
String sendFlag = configuration.getSendFlag();
if (StringUtils.isEmpty(sendFlag) || (Boolean) metadata.get(sendFlag).orElse(Boolean.FALSE)) {
VelocityContext context = VelocityUtils.createContext(metadata);
ServerSideRpcCallActionMsg.ServerSideRpcCallActionMsgBuilder builder = ServerSideRpcCallActionMsg.builder();
deviceIdTemplate.ifPresent(t -> builder.deviceId(VelocityUtils.merge(t, context)));
fromDeviceRelationTemplate.ifPresent(t -> builder.fromDeviceRelation(VelocityUtils.merge(t, context)));
toDeviceRelationTemplate.ifPresent(t -> builder.toDeviceRelation(VelocityUtils.merge(t, context)));
rpcCallMethodTemplate.ifPresent(t -> builder.rpcCallMethod(VelocityUtils.merge(t, context)));
rpcCallBodyTemplate.ifPresent(t -> builder.rpcCallBody(VelocityUtils.merge(t, context)));
return Optional.of(new ServerSideRpcCallRuleToPluginActionMsg(toDeviceActorMsg.getTenantId(), toDeviceActorMsg.getCustomerId(), toDeviceActorMsg.getDeviceId(),
builder.build()));
} else {
return Optional.empty();
}
}
private Optional<Template> toTemplate(String source, String name) throws ParseException {
if (!StringUtils.isEmpty(source)) {
return Optional.of(VelocityUtils.create(source, name));
} else {
return Optional.empty();
}
}
@Override
public Optional<ToDeviceMsg> convert(PluginToRuleMsg<?> response) {
if (response instanceof ResponsePluginToRuleMsg) {
return Optional.of(((ResponsePluginToRuleMsg) response).getPayload());
}
return Optional.empty();
}
@Override
public boolean isOneWayAction() {
return true;
}
}

View File

@ -0,0 +1,34 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.core.action.rpc;
import lombok.Data;
/**
* @author Andrew Shvayka
*/
@Data
public class ServerSideRpcCallActionConfiguration {
private String sendFlag;
private String deviceIdTemplate;
private String rpcCallMethodTemplate;
private String rpcCallBodyTemplate;
private long rpcCallTimeoutInSec;
private String fromDeviceRelationTemplate;
private String toDeviceRelationTemplate;
}

View File

@ -0,0 +1,38 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.core.action.rpc;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* Created by ashvayka on 14.09.17.
*/
@Data
@Builder
public class ServerSideRpcCallActionMsg implements Serializable {
private String deviceId;
private String rpcCallMethod;
private String rpcCallBody;
private long rpcCallTimeoutInSec;
private String fromDeviceRelation;
private String toDeviceRelation;
}

View File

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.core.action.rpc;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.extensions.api.plugins.msg.AbstractRuleToPluginMsg;
/**
* Created by ashvayka on 14.09.17.
*/
public class ServerSideRpcCallRuleToPluginActionMsg extends AbstractRuleToPluginMsg<ServerSideRpcCallActionMsg> {
public ServerSideRpcCallRuleToPluginActionMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId,
ServerSideRpcCallActionMsg payload) {
super(tenantId, customerId, deviceId, payload);
}
}

View File

@ -19,15 +19,19 @@ import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.extensions.api.component.Plugin;
import org.thingsboard.server.extensions.api.plugins.AbstractPlugin;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.handlers.DefaultRuleMsgHandler;
import org.thingsboard.server.extensions.api.plugins.handlers.RestMsgHandler;
import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
import org.thingsboard.server.extensions.core.action.rpc.ServerSideRpcCallAction;
import org.thingsboard.server.extensions.core.plugin.rpc.handlers.RpcRestMsgHandler;
import org.thingsboard.server.extensions.core.plugin.rpc.handlers.RpcRuleMsgHandler;
/**
* @author Andrew Shvayka
*/
@Plugin(name = "RPC Plugin", actions = {}, descriptor = "RpcPluginDescriptor.json", configuration = RpcPluginConfiguration.class)
@Plugin(name = "RPC Plugin", actions = {ServerSideRpcCallAction.class}, descriptor = "RpcPluginDescriptor.json", configuration = RpcPluginConfiguration.class)
@Slf4j
public class RpcPlugin extends AbstractPlugin<RpcPluginConfiguration> {
@ -60,6 +64,11 @@ public class RpcPlugin extends AbstractPlugin<RpcPluginConfiguration> {
restMsgHandler.setDefaultTimeout(configuration.getDefaultTimeout());
}
@Override
protected RuleMsgHandler getRuleMsgHandler() {
return new RpcRuleMsgHandler();
}
@Override
public void resume(PluginContext ctx) {

View File

@ -0,0 +1,102 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.core.plugin.rpc.handlers;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.extensions.api.plugins.PluginCallback;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequest;
import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestBody;
import org.thingsboard.server.extensions.api.rules.RuleException;
import org.thingsboard.server.extensions.core.action.rpc.ServerSideRpcCallActionMsg;
import org.thingsboard.server.extensions.core.action.rpc.ServerSideRpcCallRuleToPluginActionMsg;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Created by ashvayka on 14.09.17.
*/
@Slf4j
public class RpcRuleMsgHandler implements RuleMsgHandler {
@Override
public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg<?> msg) throws RuleException {
if (msg instanceof ServerSideRpcCallRuleToPluginActionMsg) {
handle(ctx, tenantId, ruleId, ((ServerSideRpcCallRuleToPluginActionMsg) msg).getPayload());
} else {
throw new RuntimeException("Not supported msg: " + msg + "!");
}
}
private void handle(final PluginContext ctx, TenantId tenantId, RuleId ruleId, ServerSideRpcCallActionMsg msg) {
DeviceId deviceId = new DeviceId(UUID.fromString(msg.getDeviceId()));
ctx.checkAccess(deviceId, new PluginCallback<Void>() {
@Override
public void onSuccess(PluginContext dummy, Void value) {
try {
List<EntityId> deviceIds;
if (StringUtils.isEmpty(msg.getFromDeviceRelation()) && StringUtils.isEmpty(msg.getToDeviceRelation())) {
deviceIds = Collections.singletonList(deviceId);
} else if (!StringUtils.isEmpty(msg.getFromDeviceRelation())) {
List<EntityRelation> relations = ctx.findByFromAndType(deviceId, msg.getFromDeviceRelation()).get();
deviceIds = relations.stream().map(EntityRelation::getTo).collect(Collectors.toList());
} else {
List<EntityRelation> relations = ctx.findByToAndType(deviceId, msg.getFromDeviceRelation()).get();
deviceIds = relations.stream().map(EntityRelation::getFrom).collect(Collectors.toList());
}
ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(msg.getRpcCallMethod(), msg.getRpcCallBody());
long expirationTime = System.currentTimeMillis() + msg.getRpcCallTimeoutInSec();
for (EntityId address : deviceIds) {
DeviceId tmpId = new DeviceId(address.getId());
ctx.checkAccess(tmpId, new PluginCallback<Void>() {
@Override
public void onSuccess(PluginContext ctx, Void value) {
ctx.sendRpcRequest(new ToDeviceRpcRequest(UUID.randomUUID(),
tenantId, tmpId, true, expirationTime, body)
);
log.trace("[{}] Sent RPC Call Action msg", tmpId);
}
@Override
public void onFailure(PluginContext ctx, Exception e) {
log.info("[{}] Failed to process RPC Call Action msg", tmpId, e);
}
});
}
} catch (Exception e) {
log.info("Failed to process RPC Call Action msg", e);
}
}
@Override
public void onFailure(PluginContext dummy, Exception e) {
log.info("[{}] Failed to process RPC Call Action msg", deviceId, e);
}
});
}
}

View File

@ -4,7 +4,7 @@
"type": "object",
"properties": {
"sendFlag": {
"title": "Send flag",
"title": "Send flag (empty or 'isNewAlarm', 'isExistingAlarm', 'isClearedAlarm', 'isNewOrClearedAlarm')",
"type": "string"
},
"fromTemplate": {

View File

@ -0,0 +1,57 @@
{
"schema": {
"title": "Send Mail Action Configuration",
"type": "object",
"properties": {
"sendFlag": {
"title": "Send flag (empty or 'isNewAlarm', 'isExistingAlarm', 'isClearedAlarm', 'isNewOrClearedAlarm')",
"type": "string"
},
"deviceIdTemplate": {
"title": "Device ID template",
"type": "string",
"default": "$deviceId"
},
"rpcCallMethodTemplate": {
"title": "RPC Call template",
"type": "string"
},
"rpcCallBodyTemplate": {
"title": "RPC Call Body template",
"type": "string"
},
"rpcCallTimeoutInSec": {
"title": "RPC Call timeout in seconds",
"type": "integer",
"default": 60
},
"fromDeviceRelationTemplate": {
"title": "From Device Relation template",
"type": "string"
},
"toDeviceRelationTemplate": {
"title": "To Device Relation template",
"type": "string"
}
},
"required": [
"deviceIdTemplate",
"rpcCallMethodTemplate",
"rpcCallBodyTemplate",
"rpcCallTimeoutInSec"
]
},
"form": [
"sendFlag",
"deviceIdTemplate",
"rpcCallMethodTemplate",
{
"key": "rpcCallBodyTemplate",
"type": "textarea",
"rows": 5
},
"rpcCallTimeoutInSec",
"fromDeviceRelationTemplate",
"toDeviceRelationTemplate"
]
}