diff --git a/application/src/main/data/upgrade/2.6.0/schema_update.cql b/application/src/main/data/upgrade/2.6.0/schema_update.cql index f85b309836..6ac8f404bd 100644 --- a/application/src/main/data/upgrade/2.6.0/schema_update.cql +++ b/application/src/main/data/upgrade/2.6.0/schema_update.cql @@ -14,6 +14,38 @@ -- limitations under the License. -- +DROP MATERIALIZED VIEW IF EXISTS thingsboard.rule_chain_by_tenant_and_search_text; + +DROP TABLE IF EXISTS thingsboard.rule_chain; + +CREATE TABLE IF NOT EXISTS thingsboard.rule_chain ( + id uuid, + tenant_id uuid, + name text, + type text, + search_text text, + first_rule_node_id uuid, + root boolean, + debug_mode boolean, + configuration text, + additional_info text, + PRIMARY KEY (id, tenant_id, type) +); + +CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.rule_chain_by_tenant_and_search_text AS + SELECT * + from thingsboard.rule_chain + WHERE tenant_id IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL AND type IS NOT NULL + PRIMARY KEY ( tenant_id, search_text, id, type ) + WITH CLUSTERING ORDER BY ( search_text ASC, id DESC ); + +CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.rule_chain_by_tenant_by_type_and_search_text AS + SELECT * + from thingsboard.rule_chain + WHERE tenant_id IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL AND type IS NOT NULL + PRIMARY KEY ( tenant_id, type, search_text, id ) + WITH CLUSTERING ORDER BY ( type ASC, search_text ASC, id DESC ); + CREATE TABLE IF NOT EXISTS thingsboard.edge ( id timeuuid, tenant_id timeuuid, @@ -37,12 +69,12 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_name AS PRIMARY KEY ( tenant_id, name, id, customer_id, type) WITH CLUSTERING ORDER BY ( name ASC, id DESC, customer_id DESC); -CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_routing_key AS +CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_routing_key AS SELECT * from thingsboard.edge WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND routing_key IS NOT NULL AND id IS NOT NULL - PRIMARY KEY ( tenant_id, routing_key, id, customer_id, type) - WITH CLUSTERING ORDER BY ( routing_key ASC, id DESC, customer_id DESC); + PRIMARY KEY ( routing_key, tenant_id, id, customer_id, type) + WITH CLUSTERING ORDER BY ( tenant_id DESC, id DESC, customer_id DESC); CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_search_text AS SELECT * diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index 01ad8a29e9..c9f6b94922 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -152,6 +152,15 @@ public class ThingsboardInstallService { databaseTsUpgradeService.upgradeDatabase("2.5.4"); } + case "2.5.5": + log.info("Upgrading ThingsBoard from version 2.5.5 to 2.6.0 ..."); + if (databaseTsUpgradeService != null) { + databaseTsUpgradeService.upgradeDatabase("2.5.5"); + } + databaseEntitiesUpgradeService.upgradeDatabase("2.5.5"); + + dataUpdateService.updateData("2.5.5"); + log.info("Updating system data..."); systemDataLoaderService.deleteSystemWidgetBundle("charts"); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 8e2990c75f..f6359275ea 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -150,6 +150,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } catch (Exception e) { log.warn("Failed to process messages handling!", e); + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) {} } } }); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 375db62e80..4b76fb9731 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -252,52 +252,54 @@ public final class EdgeGrpcSession implements Closeable { } void processHandleMessages() throws ExecutionException, InterruptedException { - Long queueStartTs = getQueueStartTs().get(); - TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), queueStartTs, null, true); - TimePageData pageData; - UUID ifOffset = null; - boolean success = true; - do { - pageData = ctx.getEdgeNotificationService().findEdgeEvents(edge.getTenantId(), edge.getId(), pageLink); - if (isConnected() && !pageData.getData().isEmpty()) { - log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size()); - List downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData()); - log.trace("[{}] downlink msg(s) are going to be send.", downlinkMsgsPack.size()); + if (isConnected()) { + Long queueStartTs = getQueueStartTs().get(); + TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), queueStartTs, null, true); + TimePageData pageData; + UUID ifOffset = null; + boolean success = true; + do { + pageData = ctx.getEdgeNotificationService().findEdgeEvents(edge.getTenantId(), edge.getId(), pageLink); + if (isConnected() && !pageData.getData().isEmpty()) { + log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size()); + List downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData()); + log.trace("[{}] downlink msg(s) are going to be send.", downlinkMsgsPack.size()); - latch = new CountDownLatch(downlinkMsgsPack.size()); - for (DownlinkMsg downlinkMsg : downlinkMsgsPack) { - sendResponseMsg(ResponseMsg.newBuilder() - .setDownlinkMsg(downlinkMsg) - .build()); + latch = new CountDownLatch(downlinkMsgsPack.size()); + for (DownlinkMsg downlinkMsg : downlinkMsgsPack) { + sendResponseMsg(ResponseMsg.newBuilder() + .setDownlinkMsg(downlinkMsg) + .build()); + } + + ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId(); + + success = latch.await(10, TimeUnit.SECONDS); + if (!success) { + log.warn("Failed to deliver the batch: {}", downlinkMsgsPack); + } } - - ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId(); - - success = latch.await(10, TimeUnit.SECONDS); - if (!success) { - log.warn("Failed to deliver the batch: {}", downlinkMsgsPack); + if (isConnected() && (!success || pageData.hasNext())) { + try { + Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches()); + } catch (InterruptedException e) { + log.error("Error during sleep between batches", e); + } + if (success) { + pageLink = pageData.getNextPageLink(); + } } + } while (isConnected() && (!success || pageData.hasNext())); + + if (ifOffset != null) { + Long newStartTs = UUIDs.unixTimestamp(ifOffset); + updateQueueStartTs(newStartTs); } - if (isConnected() && (!success || pageData.hasNext())) { - try { - Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches()); - } catch (InterruptedException e) { - log.error("Error during sleep between batches", e); - } - if (success) { - pageLink = pageData.getNextPageLink(); - } + try { + Thread.sleep(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval()); + } catch (InterruptedException e) { + log.error("Error during sleep", e); } - } while (isConnected() && (!success || pageData.hasNext())); - - if (ifOffset != null) { - Long newStartTs = UUIDs.unixTimestamp(ifOffset); - updateQueueStartTs(newStartTs); - } - try { - Thread.sleep(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval()); - } catch (InterruptedException e) { - log.error("Error during sleep", e); } } diff --git a/application/src/main/java/org/thingsboard/server/service/install/AbstractSqlTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/AbstractSqlTsDatabaseUpgradeService.java index 5f084c7a9d..ae650592d2 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/AbstractSqlTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/AbstractSqlTsDatabaseUpgradeService.java @@ -50,7 +50,7 @@ public abstract class AbstractSqlTsDatabaseUpgradeService { @Autowired protected InstallScripts installScripts; - protected abstract void loadSql(Connection conn, String fileName); + protected abstract void loadSql(Connection conn, String version, String fileName); protected void loadFunctions(Path sqlFile, Connection conn) throws Exception { String sql = new String(Files.readAllBytes(sqlFile), StandardCharsets.UTF_8); diff --git a/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java index 29d698d0a4..7cd81d8832 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.dao.dashboard.DashboardService; import org.thingsboard.server.dao.util.NoSqlDao; import org.thingsboard.server.service.install.cql.CassandraDbHelper; @@ -44,6 +45,7 @@ import static org.thingsboard.server.service.install.DatabaseHelper.ENTITY_VIEWS import static org.thingsboard.server.service.install.DatabaseHelper.ID; import static org.thingsboard.server.service.install.DatabaseHelper.KEYS; import static org.thingsboard.server.service.install.DatabaseHelper.NAME; +import static org.thingsboard.server.service.install.DatabaseHelper.RULE_CHAIN; import static org.thingsboard.server.service.install.DatabaseHelper.SEARCH_TEXT; import static org.thingsboard.server.service.install.DatabaseHelper.START_TS; import static org.thingsboard.server.service.install.DatabaseHelper.TENANT_ID; @@ -306,17 +308,39 @@ public class CassandraDatabaseUpgradeService extends AbstractCassandraDatabaseUp } log.info("Schema updated."); break; - case "2.5.0": + case "2.5.5": + + log.info("Upgrading Cassandra DataBase from version {} to 2.6.0 ...", fromVersion); + + // Dump rule chains + + cluster.getSession(); + + ks = cluster.getCluster().getMetadata().getKeyspace(cluster.getKeyspaceName()); + + log.info("Dumping rule chains ..."); + Path ruleChainsDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), RULE_CHAIN, + new String[]{ID, TENANT_ID, NAME, SEARCH_TEXT, "first_rule_node_id", "root", "debug_mode", CONFIGURATION, ADDITIONAL_INFO, TYPE}, + new String[]{"", "", "", "", "", "", "", "", "", RuleChainType.CORE.name()}, + "tb-rule-chains"); + log.info("Rule chains dumped."); + log.info("Updating schema ..."); schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.6.0", SCHEMA_UPDATE_CQL); loadCql(schemaUpdateFile); - - try { - cluster.getSession().execute("alter table rule_chain add type text"); - Thread.sleep(2500); - } catch (InvalidQueryException e) {} log.info("Schema updated."); + + // Restore rule chains + + log.info("Restoring rule chains ..."); + if (ruleChainsDump != null) { + CassandraDbHelper.loadCf(ks, cluster.getSession(), RULE_CHAIN, + new String[]{ID, TENANT_ID, NAME, SEARCH_TEXT, "first_rule_node_id", "root", "debug_mode", CONFIGURATION, ADDITIONAL_INFO, TYPE}, ruleChainsDump); + Files.deleteIfExists(ruleChainsDump); + } + log.info("Rule chains restored."); break; + default: throw new RuntimeException("Unable to upgrade Cassandra database, unsupported fromVersion: " + fromVersion); } diff --git a/application/src/main/java/org/thingsboard/server/service/install/CassandraTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/CassandraTsDatabaseUpgradeService.java index 07b8522323..9be454fc9f 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/CassandraTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/CassandraTsDatabaseUpgradeService.java @@ -50,6 +50,7 @@ public class CassandraTsDatabaseUpgradeService extends AbstractCassandraDatabase break; case "2.5.0": case "2.5.4": + case "2.5.5": break; default: throw new RuntimeException("Unable to upgrade Cassandra database, unsupported fromVersion: " + fromVersion); diff --git a/application/src/main/java/org/thingsboard/server/service/install/DatabaseHelper.java b/application/src/main/java/org/thingsboard/server/service/install/DatabaseHelper.java index aea04bcf56..5c6b63debb 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/DatabaseHelper.java +++ b/application/src/main/java/org/thingsboard/server/service/install/DatabaseHelper.java @@ -57,6 +57,7 @@ public class DatabaseHelper { public static final String DASHBOARD = "dashboard"; public static final String ENTITY_VIEWS = "entity_views"; public static final String ENTITY_VIEW = "entity_view"; + public static final String RULE_CHAIN = "rule_chain"; public static final String ID = "id"; public static final String TITLE = "title"; public static final String TYPE = "type"; diff --git a/application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java b/application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java index ea36a61b48..27423259cd 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java +++ b/application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java @@ -115,6 +115,11 @@ public class InstallScripts { } } + public void createDefaultEdgeRuleChains(TenantId tenantId) { + Path tenantChainsDir = getTenantRuleChainsDir(); + loadRuleChainFromFile(tenantId, tenantChainsDir.resolve("edge_root_rule_chain.json")); + } + public void loadSystemWidgets() throws Exception { Path widgetBundlesDir = Paths.get(getDataDir(), JSON_DIR, SYSTEM_DIR, WIDGET_BUNDLES_DIR); try (DirectoryStream dirStream = Files.newDirectoryStream(widgetBundlesDir, path -> path.toString().endsWith(JSON_EXT))) { diff --git a/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java index 396f84664a..0bfa28e03a 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java @@ -94,7 +94,7 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe log.info("PostgreSQL version is valid!"); if (isOldSchema(conn, 2004003)) { log.info("Load upgrade functions ..."); - loadSql(conn, LOAD_FUNCTIONS_SQL); + loadSql(conn, "2.4.3", LOAD_FUNCTIONS_SQL); log.info("Updating timeseries schema ..."); executeQuery(conn, CALL_CREATE_PARTITION_TS_KV_TABLE); if (!partitionType.equals("INDEFINITE")) { @@ -179,9 +179,9 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe } log.info("Load TTL functions ..."); - loadSql(conn, LOAD_TTL_FUNCTIONS_SQL); + loadSql(conn, "2.4.3", LOAD_TTL_FUNCTIONS_SQL); log.info("Load Drop Partitions functions ..."); - loadSql(conn, LOAD_DROP_PARTITIONS_FUNCTIONS_SQL); + loadSql(conn, "2.4.3", LOAD_DROP_PARTITIONS_FUNCTIONS_SQL); executeQuery(conn, "UPDATE tb_schema_settings SET schema_version = 2005000"); @@ -198,7 +198,13 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe case "2.5.4": try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { log.info("Load Drop Partitions functions ..."); - loadSql(conn, LOAD_DROP_PARTITIONS_FUNCTIONS_SQL); + loadSql(conn, "2.4.3", LOAD_DROP_PARTITIONS_FUNCTIONS_SQL); + } + break; + case "2.5.5": + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { + log.info("Load TTL functions ..."); + loadSql(conn, "2.6.0", LOAD_TTL_FUNCTIONS_SQL); } break; default: @@ -236,8 +242,8 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe } @Override - protected void loadSql(Connection conn, String fileName) { - Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", fileName); + protected void loadSql(Connection conn, String version, String fileName) { + Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", version, fileName); try { loadFunctions(schemaUpdateFile, conn); log.info("Functions successfully loaded!"); diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java index eda9cf88cb..a0c1456e0f 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java @@ -233,7 +233,7 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService log.info("Schema updated."); } break; - case "2.5.0": + case "2.5.5": try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { log.info("Updating schema ..."); schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.6.0", SCHEMA_UPDATE_SQL); diff --git a/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java index a929a51fb5..e47d6d7083 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java @@ -89,7 +89,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr log.info("PostgreSQL version is valid!"); if (isOldSchema(conn, 2004003)) { log.info("Load upgrade functions ..."); - loadSql(conn, LOAD_FUNCTIONS_SQL); + loadSql(conn, "2.4.3", LOAD_FUNCTIONS_SQL); log.info("Updating timescale schema ..."); executeQuery(conn, CALL_CREATE_TS_KV_LATEST_TABLE); executeQuery(conn, CALL_CREATE_NEW_TENANT_TS_KV_TABLE); @@ -165,7 +165,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr } log.info("Load TTL functions ..."); - loadSql(conn, LOAD_TTL_FUNCTIONS_SQL); + loadSql(conn, "2.4.3", LOAD_TTL_FUNCTIONS_SQL); executeQuery(conn, "UPDATE tb_schema_settings SET schema_version = 2005000"); log.info("schema timescale updated!"); @@ -179,6 +179,8 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr break; case "2.5.4": break; + case "2.5.5": + break; default: throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion); } @@ -200,8 +202,8 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr } @Override - protected void loadSql(Connection conn, String fileName) { - Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.3", fileName); + protected void loadSql(Connection conn, String version, String fileName) { + Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", version, fileName); try { loadFunctions(schemaUpdateFile, conn); log.info("Functions successfully loaded!"); diff --git a/application/src/main/java/org/thingsboard/server/service/install/cql/CassandraDbHelper.java b/application/src/main/java/org/thingsboard/server/service/install/cql/CassandraDbHelper.java index fb75a12dac..0e43a6bd5f 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/cql/CassandraDbHelper.java +++ b/application/src/main/java/org/thingsboard/server/service/install/cql/CassandraDbHelper.java @@ -157,6 +157,8 @@ public class CassandraDbHelper { str = new Float(row.getFloat(index)).toString(); } else if (type == DataType.timestamp()) { str = ""+row.getTimestamp(index).getTime(); + } else if (type == DataType.cboolean()) { + str = ""+ row.getBool(index); } else { str = row.getString(index); } @@ -205,6 +207,8 @@ public class CassandraDbHelper { boundStatement.setFloat(column, Float.valueOf(value)); } else if (type == DataType.timestamp()) { boundStatement.setTimestamp(column, new Date(Long.valueOf(value))); + } else if (type == DataType.cboolean()) { + boundStatement.setBool(column, Boolean.valueOf(value)); } else { boundStatement.setString(column, value); } diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index 0d2401b0fd..77b356797c 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -19,9 +19,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; -import org.thingsboard.server.common.data.SearchTextBased; import org.thingsboard.server.common.data.Tenant; -import org.thingsboard.server.common.data.id.UUIDBased; import org.thingsboard.server.common.data.page.TextPageData; import org.thingsboard.server.common.data.page.TextPageLink; import org.thingsboard.server.common.data.rule.RuleChain; @@ -50,6 +48,10 @@ public class DefaultDataUpdateService implements DataUpdateService { log.info("Updating data from version 1.4.0 to 2.0.0 ..."); tenantsDefaultRuleChainUpdater.updateEntities(null); break; + case "2.5.5": + log.info("Updating data from version 2.5.5 to 2.6.0 ..."); + tenantsDefaultEdgeRuleChainUpdater.updateEntities(null); + break; default: throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); } @@ -76,4 +78,24 @@ public class DefaultDataUpdateService implements DataUpdateService { } }; + private PaginatedUpdater tenantsDefaultEdgeRuleChainUpdater = + new PaginatedUpdater() { + + @Override + protected TextPageData findEntities(String region, TextPageLink pageLink) { + return tenantService.findTenants(pageLink); + } + + @Override + protected void updateEntity(Tenant tenant) { + try { + RuleChain defaultEdgeRuleChain = ruleChainService.getDefaultRootEdgeRuleChain(tenant.getId()); + if (defaultEdgeRuleChain == null) { + installScripts.createDefaultEdgeRuleChains(tenant.getId()); + } + } catch (Exception e) { + log.error("Unable to update Tenant", e); + } + } + }; } \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/CassandraEdgeDao.java b/dao/src/main/java/org/thingsboard/server/dao/edge/CassandraEdgeDao.java index 3d77b5fa78..7705188132 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/CassandraEdgeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/CassandraEdgeDao.java @@ -27,7 +27,6 @@ import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.edge.Edge; @@ -57,8 +56,8 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.in; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_CUSTOMER_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME; import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_CUSTOMER_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME; +import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_ROUTING_KEY_VIEW_NAME; import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_TENANT_AND_NAME_VIEW_NAME; -import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_TENANT_AND_ROUTING_KEY_VIEW_NAME; import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME; import static org.thingsboard.server.dao.model.ModelConstants.EDGE_BY_TENANT_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME; import static org.thingsboard.server.dao.model.ModelConstants.EDGE_COLUMN_FAMILY_NAME; @@ -203,9 +202,8 @@ public class CassandraEdgeDao extends CassandraAbstractSearchTextDao findByRoutingKey(UUID tenantId, String routingKey) { - Select select = select().from(EDGE_BY_TENANT_AND_ROUTING_KEY_VIEW_NAME); + Select select = select().from(EDGE_BY_ROUTING_KEY_VIEW_NAME); Select.Where query = select.where(); - query.and(eq(EDGE_TENANT_ID_PROPERTY, tenantId)); query.and(eq(EDGE_ROUTING_KEY_PROPERTY, routingKey)); return Optional.ofNullable(DaoUtil.getData(findOneByStatement(new TenantId(tenantId), query))); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java index 98e7177d33..54a65077c5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java @@ -373,7 +373,7 @@ public class ModelConstants { public static final String EDGE_BY_CUSTOMER_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "edge_by_customer_and_search_text"; public static final String EDGE_BY_CUSTOMER_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "edge_by_customer_by_type_and_search_text"; public static final String EDGE_BY_TENANT_AND_NAME_VIEW_NAME = "edge_by_tenant_and_name"; - public static final String EDGE_BY_TENANT_AND_ROUTING_KEY_VIEW_NAME = "edge_by_tenant_and_routing_key"; + public static final String EDGE_BY_ROUTING_KEY_VIEW_NAME = "edge_by_routing_key"; public static final String EDGE_ROUTING_KEY_PROPERTY = "routing_key"; public static final String EDGE_SECRET_PROPERTY = "secret"; diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/CassandraRuleChainDao.java b/dao/src/main/java/org/thingsboard/server/dao/rule/CassandraRuleChainDao.java index d636281c2a..9dfc869afd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rule/CassandraRuleChainDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/CassandraRuleChainDao.java @@ -22,9 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.EdgeId; -import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.TextPageLink; import org.thingsboard.server.common.data.page.TimePageLink; @@ -45,8 +43,6 @@ import java.util.List; import java.util.UUID; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; -import static org.thingsboard.server.dao.model.ModelConstants.DEVICE_TENANT_ID_PROPERTY; -import static org.thingsboard.server.dao.model.ModelConstants.DEVICE_TYPE_PROPERTY; import static org.thingsboard.server.dao.model.ModelConstants.RULE_CHAIN_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME; import static org.thingsboard.server.dao.model.ModelConstants.RULE_CHAIN_BY_TENANT_BY_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME; import static org.thingsboard.server.dao.model.ModelConstants.RULE_CHAIN_COLUMN_FAMILY_NAME; diff --git a/dao/src/main/resources/cassandra/schema-entities.cql b/dao/src/main/resources/cassandra/schema-entities.cql index 7375f7e9e7..a5518662ae 100644 --- a/dao/src/main/resources/cassandra/schema-entities.cql +++ b/dao/src/main/resources/cassandra/schema-entities.cql @@ -749,12 +749,12 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_name AS PRIMARY KEY ( tenant_id, name, id, customer_id, type) WITH CLUSTERING ORDER BY ( name ASC, id DESC, customer_id DESC); -CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_routing_key AS +CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_routing_key AS SELECT * from thingsboard.edge WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND routing_key IS NOT NULL AND id IS NOT NULL - PRIMARY KEY ( tenant_id, routing_key, id, customer_id, type) - WITH CLUSTERING ORDER BY ( routing_key ASC, id DESC, customer_id DESC); + PRIMARY KEY ( routing_key, tenant_id, id, customer_id, type) + WITH CLUSTERING ORDER BY ( tenant_id DESC, id DESC, customer_id DESC); CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_search_text AS SELECT *