Merge with master

This commit is contained in:
Andrii Shvaika 2020-09-30 13:11:58 +03:00
commit c8f8901a7f
21 changed files with 63 additions and 44 deletions

View File

@ -43,7 +43,8 @@
"name": "Save Client Attributes", "name": "Save Client Attributes",
"debugMode": false, "debugMode": false,
"configuration": { "configuration": {
"scope": "CLIENT_SCOPE" "scope": "CLIENT_SCOPE",
"notifyDevice": "false"
} }
}, },
{ {

View File

@ -31,7 +31,8 @@
"name": "Save Client Attributes", "name": "Save Client Attributes",
"debugMode": false, "debugMode": false,
"configuration": { "configuration": {
"scope": "CLIENT_SCOPE" "scope": "CLIENT_SCOPE",
"notifyDevice": "false"
} }
}, },
{ {

View File

@ -226,6 +226,11 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
@Override @Override
public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback) { public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback) {
onAttributesUpdate(tenantId, entityId, scope, attributes, true, callback);
}
@Override
public void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, TbCallback callback) {
onLocalTelemetrySubUpdate(entityId, onLocalTelemetrySubUpdate(entityId,
s -> { s -> {
if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) { if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) {
@ -254,7 +259,7 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
deviceStateService.onDeviceInactivityTimeoutUpdate(new DeviceId(entityId.getId()), attribute.getLongValue().orElse(0L)); deviceStateService.onDeviceInactivityTimeoutUpdate(new DeviceId(entityId.getId()), attribute.getLongValue().orElse(0L));
} }
} }
} else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope)) { } else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) {
clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(tenantId,
new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes)) new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes))
, null); , null);

View File

@ -17,13 +17,12 @@ package org.thingsboard.server.service.subscription;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
import java.util.List; import java.util.List;
@ -37,9 +36,13 @@ public interface SubscriptionManagerService extends ApplicationListener<Partitio
void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback); void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, TbCallback callback);
void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, TbCallback callback);
void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback empty); void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback empty);
void onAlarmUpdate(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback); void onAlarmUpdate(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback);
void onAlarmDeleted(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback); void onAlarmDeleted(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback);
} }

View File

@ -171,9 +171,14 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Override @Override
public void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback) { public void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback) {
saveAndNotify(tenantId, entityId, scope, attributes, true, callback);
}
@Override
public void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback) {
ListenableFuture<List<Void>> saveFuture = attrService.save(tenantId, entityId, scope, attributes); ListenableFuture<List<Void>> saveFuture = attrService.save(tenantId, entityId, scope, attributes);
addMainCallback(saveFuture, callback); addMainCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes)); addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes, notifyDevice));
} }
@Override @Override
@ -236,11 +241,11 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
, System.currentTimeMillis())), callback); , System.currentTimeMillis())), callback);
} }
private void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) { private void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
if (currentPartitions.contains(tpi)) { if (currentPartitions.contains(tpi)) {
if (subscriptionManagerService.isPresent()) { if (subscriptionManagerService.isPresent()) {
subscriptionManagerService.get().onAttributesUpdate(tenantId, entityId, scope, attributes, TbCallback.EMPTY); subscriptionManagerService.get().onAttributesUpdate(tenantId, entityId, scope, attributes, notifyDevice, TbCallback.EMPTY);
} else { } else {
log.warn("Possible misconfiguration because subscriptionManagerService is null!"); log.warn("Possible misconfiguration because subscriptionManagerService is null!");
} }

View File

@ -34,7 +34,7 @@ public class ControllerSqlTestSuite {
@ClassRule @ClassRule
public static CustomSqlUnit sqlUnit = new CustomSqlUnit( public static CustomSqlUnit sqlUnit = new CustomSqlUnit(
Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/schema-entities-idx.sql", "sql/system-data.sql"), Arrays.asList("sql/schema-types-hsql.sql", "sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/schema-entities-idx.sql", "sql/system-data.sql"),
"sql/hsql/drop-all-tables.sql", "sql/hsql/drop-all-tables.sql",
"sql-test.properties"); "sql-test.properties");

View File

@ -33,7 +33,7 @@ public class MqttNoSqlTestSuite {
@ClassRule @ClassRule
public static CustomSqlUnit sqlUnit = new CustomSqlUnit( public static CustomSqlUnit sqlUnit = new CustomSqlUnit(
Arrays.asList("sql/schema-entities-hsql.sql", "sql/system-data.sql"), Arrays.asList("sql/schema-types-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"),
"sql/hsql/drop-all-tables.sql", "sql/hsql/drop-all-tables.sql",
"nosql-test.properties"); "nosql-test.properties");

View File

@ -33,7 +33,7 @@ public class MqttSqlTestSuite {
@ClassRule @ClassRule
public static CustomSqlUnit sqlUnit = new CustomSqlUnit( public static CustomSqlUnit sqlUnit = new CustomSqlUnit(
Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"), Arrays.asList("sql/schema-types-hsql.sql", "sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"),
"sql/hsql/drop-all-tables.sql", "sql/hsql/drop-all-tables.sql",
"sql-test.properties"); "sql-test.properties");

View File

@ -32,7 +32,7 @@ public class RuleEngineSqlTestSuite {
@ClassRule @ClassRule
public static CustomSqlUnit sqlUnit = new CustomSqlUnit( public static CustomSqlUnit sqlUnit = new CustomSqlUnit(
Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"), Arrays.asList("sql/schema-types-hsql.sql", "sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"),
"sql/hsql/drop-all-tables.sql", "sql/hsql/drop-all-tables.sql",
"sql-test.properties"); "sql-test.properties");

View File

@ -33,7 +33,7 @@ public class SystemSqlTestSuite {
@ClassRule @ClassRule
public static CustomSqlUnit sqlUnit = new CustomSqlUnit( public static CustomSqlUnit sqlUnit = new CustomSqlUnit(
Arrays.asList("sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"), Arrays.asList("sql/schema-types-hsql.sql", "sql/schema-ts-hsql.sql", "sql/schema-entities-hsql.sql", "sql/system-data.sql"),
"sql/hsql/drop-all-tables.sql", "sql/hsql/drop-all-tables.sql",
"sql-test.properties"); "sql-test.properties");

View File

@ -381,6 +381,7 @@ public class DefaultTransportService implements TransportService {
TbMsgMetaData metaData = new TbMsgMetaData(); TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("deviceName", sessionInfo.getDeviceName()); metaData.putValue("deviceName", sessionInfo.getDeviceName());
metaData.putValue("deviceType", sessionInfo.getDeviceType()); metaData.putValue("deviceType", sessionInfo.getDeviceType());
metaData.putValue("notifyDevice", "false");
RuleChainId ruleChainId = resolveRuleChainId(sessionInfo); RuleChainId ruleChainId = resolveRuleChainId(sessionInfo);
TbMsg tbMsg = TbMsg.newMsg(ServiceQueue.MAIN, SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), TbMsg tbMsg = TbMsg.newMsg(ServiceQueue.MAIN, SessionMsgType.POST_ATTRIBUTES_REQUEST.name(),
deviceId, metaData, gson.toJson(json), ruleChainId, null); deviceId, metaData, gson.toJson(json), ruleChainId, null);

View File

@ -77,7 +77,7 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository {
alarmFieldColumnMap.put("originator", "originator_name"); alarmFieldColumnMap.put("originator", "originator_name");
} }
private static final String SELECT_ORIGINATOR_NAME = " CASE" + private static final String SELECT_ORIGINATOR_NAME = " COALESCE(CASE" +
" WHEN a.originator_type = " + EntityType.TENANT.ordinal() + " WHEN a.originator_type = " + EntityType.TENANT.ordinal() +
" THEN (select title from tenant where id = a.originator_id)" + " THEN (select title from tenant where id = a.originator_id)" +
" WHEN a.originator_type = " + EntityType.CUSTOMER.ordinal() + " WHEN a.originator_type = " + EntityType.CUSTOMER.ordinal() +
@ -92,7 +92,7 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository {
" THEN (select name from device where id = a.originator_id)" + " THEN (select name from device where id = a.originator_id)" +
" WHEN a.originator_type = " + EntityType.ENTITY_VIEW.ordinal() + " WHEN a.originator_type = " + EntityType.ENTITY_VIEW.ordinal() +
" THEN (select name from entity_view where id = a.originator_id)" + " THEN (select name from entity_view where id = a.originator_id)" +
" END as originator_name"; " END, 'Deleted') as originator_name";
private static final String FIELDS_SELECTION = "select a.id as id," + private static final String FIELDS_SELECTION = "select a.id as id," +
" a.created_time as created_time," + " a.created_time as created_time," +

View File

@ -13,7 +13,7 @@ spring.jpa.database-platform=org.hibernate.dialect.HSQLDialect
spring.datasource.username=sa spring.datasource.username=sa
spring.datasource.password= spring.datasource.password=
spring.datasource.url=jdbc:hsqldb:file:/tmp/testDb;sql.enforce_size=false spring.datasource.url=jdbc:hsqldb:file:target/tmp/testDb;sql.enforce_size=false
spring.datasource.driverClassName=org.hsqldb.jdbc.JDBCDriver spring.datasource.driverClassName=org.hsqldb.jdbc.JDBCDriver
spring.datasource.hikari.maximumPoolSize = 50 spring.datasource.hikari.maximumPoolSize = 50

View File

@ -19,7 +19,7 @@
"azure-sb": "^0.11.1", "azure-sb": "^0.11.1",
"config": "^3.3.1", "config": "^3.3.1",
"js-yaml": "^3.14.0", "js-yaml": "^3.14.0",
"kafkajs": "^1.12.0", "kafkajs": "^1.14.0",
"long": "^4.0.0", "long": "^4.0.0",
"uuid-parse": "^1.1.0", "uuid-parse": "^1.1.0",
"uuid-random": "^1.3.2", "uuid-random": "^1.3.2",

View File

@ -27,20 +27,10 @@ let kafkaAdmin;
let consumer; let consumer;
let producer; let producer;
const topics = [];
const configEntries = []; const configEntries = [];
function KafkaProducer() { function KafkaProducer() {
this.send = async (responseTopic, scriptId, rawResponse, headers) => { this.send = async (responseTopic, scriptId, rawResponse, headers) => {
if (!topics.includes(responseTopic)) {
let createResponseTopicResult = await createTopic(responseTopic, 1);
topics.push(responseTopic);
if (createResponseTopicResult) {
logger.info('Created new topic: %s', requestTopic);
}
}
return producer.send( return producer.send(
{ {
topic: responseTopic, topic: responseTopic,
@ -99,10 +89,13 @@ function KafkaProducer() {
} }
} }
let createRequestTopicResult = await createTopic(requestTopic, partitions); let topics = await kafkaAdmin.listTopics();
if (createRequestTopicResult) { if (!topics.includes(requestTopic)) {
logger.info('Created new topic: %s', requestTopic); let createRequestTopicResult = await createTopic(requestTopic, partitions);
if (createRequestTopicResult) {
logger.info('Created new topic: %s', requestTopic);
}
} }
consumer = kafkaClient.consumer({groupId: 'js-executor-group'}); consumer = kafkaClient.consumer({groupId: 'js-executor-group'});

View File

@ -1665,12 +1665,10 @@ jws@^4.0.0:
jwa "^2.0.0" jwa "^2.0.0"
safe-buffer "^5.0.1" safe-buffer "^5.0.1"
kafkajs@^1.12.0: kafkajs@^1.14.0:
version "1.12.0" version "1.14.0"
resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-1.12.0.tgz#50ad336baee95f3324af8ae8df6fadc96e07c613" resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-1.14.0.tgz#3d998a77bfde54dc502e8e88690eedf0b21a1ed6"
integrity sha512-Izkd9iFRgeeKaHEgVpGQH08ygzCbHSxTbnu8W3G3uiNaVjGibUTmTwjv1Qf2M8NORXcPfzwVyg6bBlVj4SKr9g== integrity sha512-W+WCekiooY5rJP3Me5N3gWcQ8O6uG6lw0vv9t+sI+WqXKjKwj2+CWIXJy241x+ITE+1M1D19ABSiL2J8lKja5A==
dependencies:
long "^4.0.0"
keyv@^3.0.0: keyv@^3.0.0:
version "3.1.0" version "3.1.0"

View File

@ -37,7 +37,7 @@ Where:
> **NOTE**: **Windows** users should use docker managed volume instead of host's dir. Create docker volume (for ex. `mytb-data`) before executing `docker run` command: > **NOTE**: **Windows** users should use docker managed volume instead of host's dir. Create docker volume (for ex. `mytb-data`) before executing `docker run` command:
> ``` > ```
> $ docker create volume mytb-data > $ docker volume create mytb-data
> ``` > ```
> After you can execute docker run command using `mytb-data` volume instead of `~/.mytb-data`. > After you can execute docker run command using `mytb-data` volume instead of `~/.mytb-data`.
> In order to get access to necessary resources from external IP/Host on **Windows** machine, please execute the following commands: > In order to get access to necessary resources from external IP/Host on **Windows** machine, please execute the following commands:

View File

@ -16,7 +16,6 @@
package org.thingsboard.rule.engine.api; package org.thingsboard.rule.engine.api;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
@ -24,7 +23,6 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
* Created by ashvayka on 02.04.18. * Created by ashvayka on 02.04.18.
@ -37,6 +35,8 @@ public interface RuleEngineTelemetryService {
void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback); void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback);
void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback);
void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback); void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback); void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback);

View File

@ -17,15 +17,13 @@ package org.thingsboard.rule.engine.telemetry;
import com.google.gson.JsonParser; import com.google.gson.JsonParser;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
@ -53,6 +51,9 @@ public class TbMsgAttributesNode implements TbNode {
@Override @Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbMsgAttributesNodeConfiguration.class); this.config = TbNodeUtils.convert(configuration, TbMsgAttributesNodeConfiguration.class);
if (config.getNotifyDevice() == null) {
config.setNotifyDevice(true);
}
} }
@Override @Override
@ -63,7 +64,15 @@ public class TbMsgAttributesNode implements TbNode {
} }
String src = msg.getData(); String src = msg.getData();
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src)); Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src));
ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), config.getScope(), new ArrayList<>(attributes), new TelemetryNodeCallback(ctx, msg)); String notifyDeviceStr = msg.getMetaData().getValue("notifyDevice");
ctx.getTelemetryService().saveAndNotify(
ctx.getTenantId(),
msg.getOriginator(),
config.getScope(),
new ArrayList<>(attributes),
config.getNotifyDevice() || StringUtils.isEmpty(notifyDeviceStr) || Boolean.parseBoolean(notifyDeviceStr),
new TelemetryNodeCallback(ctx, msg)
);
} }
@Override @Override

View File

@ -24,10 +24,13 @@ public class TbMsgAttributesNodeConfiguration implements NodeConfiguration<TbMsg
private String scope; private String scope;
private Boolean notifyDevice;
@Override @Override
public TbMsgAttributesNodeConfiguration defaultConfiguration() { public TbMsgAttributesNodeConfiguration defaultConfiguration() {
TbMsgAttributesNodeConfiguration configuration = new TbMsgAttributesNodeConfiguration(); TbMsgAttributesNodeConfiguration configuration = new TbMsgAttributesNodeConfiguration();
configuration.setScope(DataConstants.SERVER_SCOPE); configuration.setScope(DataConstants.SERVER_SCOPE);
configuration.setNotifyDevice(false);
return configuration; return configuration;
} }
} }