Rule Engine draft

This commit is contained in:
Andrew Shvayka 2018-02-07 14:17:33 +02:00
parent cf7e5861f9
commit f1a4c683f2
19 changed files with 497 additions and 2 deletions

View File

@ -247,7 +247,7 @@ spring:
database-platform: "${SPRING_JPA_DATABASE_PLATFORM:org.hibernate.dialect.HSQLDialect}"
datasource:
driverClassName: "${SPRING_DRIVER_CLASS_NAME:org.hsqldb.jdbc.JDBCDriver}"
url: "${SPRING_DATASOURCE_URL:jdbc:hsqldb:file:${SQL_DATA_FOLDER:/tmp}/thingsboardDb;sql.enforce_size=false}"
url: "${SPRING_DATASOURCE_URL:jdbc:hsqldb:file:${SQL_DATA_FOLDER:/tmp}/thingsboardDb;sql.enforce_size=false;hsqldb.log_size=5}"
username: "${SPRING_DATASOURCE_USERNAME:sa}"
password: "${SPRING_DATASOURCE_PASSWORD:}"

View File

@ -23,7 +23,6 @@
<version>1.4.0-SNAPSHOT</version>
<artifactId>thingsboard</artifactId>
</parent>
<groupId>org.thingsboard</groupId>
<artifactId>dao</artifactId>
<packaging>jar</packaging>

View File

@ -85,6 +85,7 @@
<module>extensions-api</module>
<module>extensions-core</module>
<module>extensions</module>
<module>rule-engine</module>
<module>transport</module>
<module>ui</module>
<module>tools</module>

42
rule-engine/pom.xml Normal file
View File

@ -0,0 +1,42 @@
<!--
Copyright © 2016-2017 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>1.4.0-SNAPSHOT</version>
<artifactId>thingsboard</artifactId>
</parent>
<groupId>org.thingsboard</groupId>
<artifactId>rule-engine</artifactId>
<packaging>pom</packaging>
<name>Thingsboard Extensions</name>
<url>https://thingsboard.io</url>
<properties>
<main.dir>${basedir}/..</main.dir>
</properties>
<modules>
<module>rule-engine-api</module>
<module>rule-engine-components</module>
</modules>
</project>

View File

@ -0,0 +1,67 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright © 2016-2017 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>1.4.0-SNAPSHOT</version>
<artifactId>rule-engine</artifactId>
</parent>
<groupId>org.thingsboard.rule-engine</groupId>
<artifactId>rule-engine-api</artifactId>
<packaging>jar</packaging>
<name>Thingsboard Rule Engine API</name>
<url>https://thingsboard.io</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.dir>${basedir}/../..</main.dir>
</properties>
<dependencies>
<dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>message</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.thingsboard</groupId>
<artifactId>extensions-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.thingsboard</groupId>
<artifactId>dao</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,29 @@
package org.thingsboard.rule.engine.api;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.dao.attributes.AttributesService;
import java.util.UUID;
/**
* Created by ashvayka on 13.01.18.
*/
public interface TbContext {
void tellNext(TbMsg msg);
void tellNext(TbMsg msg, String relationType);
void tellSelf(TbMsg msg, long delayMs);
void tellOthers(TbMsg msg);
void tellSibling(TbMsg msg, ServerAddress address);
void spawn(TbMsg msg);
void ack(UUID msg);
AttributesService getAttributesService();
}

View File

@ -0,0 +1,22 @@
package org.thingsboard.rule.engine.api;
import lombok.Data;
import org.thingsboard.server.common.data.id.EntityId;
import java.io.Serializable;
import java.util.UUID;
/**
* Created by ashvayka on 13.01.18.
*/
@Data
public final class TbMsg implements Serializable {
private final UUID id;
private final String type;
private final EntityId originator;
private final TbMsgMetaData metaData;
private final byte[] data;
}

View File

@ -0,0 +1,24 @@
package org.thingsboard.rule.engine.api;
import lombok.Data;
import java.io.Serializable;
import java.util.Map;
/**
* Created by ashvayka on 13.01.18.
*/
@Data
public final class TbMsgMetaData implements Serializable {
private Map<String, String> data;
public String getValue(String key) {
return data.get(key);
}
public void putValue(String key, String value) {
data.put(key, value);
}
}

View File

@ -0,0 +1,16 @@
package org.thingsboard.rule.engine.api;
import java.util.concurrent.ExecutionException;
/**
* Created by ashvayka on 19.01.18.
*/
public interface TbNode {
void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException;
void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException;
void destroy();
}

View File

@ -0,0 +1,14 @@
package org.thingsboard.rule.engine.api;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Data;
/**
* Created by ashvayka on 19.01.18.
*/
@Data
public class TbNodeConfiguration {
private JsonNode data;
}

View File

@ -0,0 +1,14 @@
package org.thingsboard.rule.engine.api;
import com.fasterxml.jackson.core.JsonProcessingException;
/**
* Created by ashvayka on 19.01.18.
*/
public class TbNodeException extends Exception {
public TbNodeException(Exception e) {
super(e);
}
}

View File

@ -0,0 +1,7 @@
package org.thingsboard.rule.engine.api;
/**
* Created by ashvayka on 19.01.18.
*/
public class TbNodeState {
}

View File

@ -0,0 +1,67 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright © 2016-2017 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>1.4.0-SNAPSHOT</version>
<artifactId>rule-engine</artifactId>
</parent>
<groupId>org.thingsboard.rule-engine</groupId>
<artifactId>rule-engine-components</artifactId>
<packaging>jar</packaging>
<name>Thingsboard Rule Engine Components</name>
<url>https://thingsboard.io</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.dir>${basedir}/../..</main.dir>
</properties>
<dependencies>
<dependency>
<groupId>org.thingsboard</groupId>
<artifactId>dao</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.thingsboard</groupId>
<artifactId>extensions-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.thingsboard.rule-engine</groupId>
<artifactId>rule-engine-api</artifactId>
<version>1.4.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,23 @@
package org.thingsboard.rule.engine;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
/**
* Created by ashvayka on 19.01.18.
*/
public class TbNodeUtils {
private static final ObjectMapper mapper = new ObjectMapper();
public static <T> T convert(TbNodeConfiguration configuration, Class<T> clazz) throws TbNodeException {
try {
return mapper.treeToValue(configuration.getData(), clazz);
} catch (JsonProcessingException e) {
throw new TbNodeException(e);
}
}
}

View File

@ -0,0 +1,35 @@
package org.thingsboard.rule.engine.filter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.*;
import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.dao.attributes.AttributesService;
import java.util.List;
/**
* Created by ashvayka on 19.01.18.
*/
@Slf4j
public class TbMsgTypeFilterNode implements TbNode {
TbMsgTypeFilterNodeConfiguration config;
@Override
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbMsgTypeFilterNodeConfiguration.class);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
ctx.tellNext(msg, Boolean.toString(config.getMessageTypes().contains(msg.getType())));
}
@Override
public void destroy() {
}
}

View File

@ -0,0 +1,15 @@
package org.thingsboard.rule.engine.filter;
import lombok.Data;
import java.util.List;
/**
* Created by ashvayka on 19.01.18.
*/
@Data
public class TbMsgTypeFilterNodeConfiguration {
private List<String> messageTypes;
}

View File

@ -0,0 +1,51 @@
package org.thingsboard.rule.engine.metadata;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.*;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.dao.attributes.AttributesService;
import java.util.List;
/**
* Created by ashvayka on 19.01.18.
*/
@Slf4j
public class TbGetAttributesNode implements TbNode {
TbGetAttributesNodeConfiguration config;
@Override
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbGetAttributesNodeConfiguration.class);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
try {
//TODO: refactor this to work async and fetch attributes from cache.
AttributesService service = ctx.getAttributesService();
fetchAttributes(msg, service, config.getClientAttributeNames(), DataConstants.CLIENT_SCOPE, "cs.");
fetchAttributes(msg, service, config.getServerAttributeNames(), DataConstants.SERVER_SCOPE, "ss.");
fetchAttributes(msg, service, config.getSharedAttributeNames(), DataConstants.SHARED_SCOPE, "shared.");
ctx.tellNext(msg);
} catch (Exception e) {
log.warn("[{}][{}] Failed to fetch attributes", msg.getOriginator(), msg.getId(), e);
throw new TbNodeException(e);
}
}
private void fetchAttributes(TbMsg msg, AttributesService service, List<String> attributeNames, String scope, String prefix) throws InterruptedException, java.util.concurrent.ExecutionException {
if (attributeNames != null && attributeNames.isEmpty()) {
List<AttributeKvEntry> attributes = service.find(msg.getOriginator(), scope, attributeNames).get();
attributes.forEach(attr -> msg.getMetaData().putValue(prefix + attr.getKey(), attr.getValueAsString()));
}
}
@Override
public void destroy() {
}
}

View File

@ -0,0 +1,17 @@
package org.thingsboard.rule.engine.metadata;
import lombok.Data;
import java.util.List;
/**
* Created by ashvayka on 19.01.18.
*/
@Data
public class TbGetAttributesNodeConfiguration {
private List<String> clientAttributeNames;
private List<String> sharedAttributeNames;
private List<String> serverAttributeNames;
}

View File

@ -0,0 +1,52 @@
package org.thingsboard.rule.engine.transform;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.*;
import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.dao.attributes.AttributesService;
import java.util.List;
/**
* Created by ashvayka on 19.01.18.
*/
@Slf4j
public class TbTransformNode implements TbNode {
TbGetAttributesNodeConfiguration config;
@Override
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbGetAttributesNodeConfiguration.class);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
try {
//TODO: refactor this to work async and fetch attributes from cache.
AttributesService service = ctx.getAttributesService();
fetchAttributes(msg, service, config.getClientAttributeNames(), DataConstants.CLIENT_SCOPE, "cs.");
fetchAttributes(msg, service, config.getServerAttributeNames(), DataConstants.SERVER_SCOPE, "ss.");
fetchAttributes(msg, service, config.getSharedAttributeNames(), DataConstants.SHARED_SCOPE, "shared.");
ctx.tellNext(msg);
} catch (Exception e) {
log.warn("[{}][{}] Failed to fetch attributes", msg.getOriginator(), msg.getId(), e);
throw new TbNodeException(e);
}
}
private void fetchAttributes(TbMsg msg, AttributesService service, List<String> attributeNames, String scope, String prefix) throws InterruptedException, java.util.concurrent.ExecutionException {
if (attributeNames != null && attributeNames.isEmpty()) {
List<AttributeKvEntry> attributes = service.find(msg.getOriginator(), scope, attributeNames).get();
attributes.forEach(attr -> msg.getMetaData().putValue(prefix + attr.getKey(), attr.getValueAsString()));
}
}
@Override
public void destroy() {
}
}