diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index 842550d209..e0daf43bf7 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -28,6 +28,7 @@ import org.thingsboard.server.service.install.DatabaseEntitiesUpgradeService; import org.thingsboard.server.service.install.EntityDatabaseSchemaService; import org.thingsboard.server.service.install.SystemDataLoaderService; import org.thingsboard.server.service.install.TsDatabaseSchemaService; +import org.thingsboard.server.service.install.migrate.EntitiesMigrateService; import org.thingsboard.server.service.install.update.DataUpdateService; @Service @@ -68,99 +69,95 @@ public class ThingsboardInstallService { @Autowired private DataUpdateService dataUpdateService; + @Autowired(required = false) + private EntitiesMigrateService entitiesMigrateService; + public void performInstall() { try { if (isUpgrade) { log.info("Starting ThingsBoard Upgrade from version {} ...", upgradeFromVersion); - switch (upgradeFromVersion) { - case "1.2.3": //NOSONAR, Need to execute gradual upgrade starting from upgradeFromVersion - log.info("Upgrading ThingsBoard from version 1.2.3 to 1.3.0 ..."); + if ("2.5.0-cassandra".equals(upgradeFromVersion)) { + log.info("Migrating ThingsBoard entities data from cassandra to SQL database ..."); + entitiesMigrateService.migrate(); + log.info("Updating system data..."); + systemDataLoaderService.updateSystemWidgets(); + } else { + switch (upgradeFromVersion) { + case "1.2.3": //NOSONAR, Need to execute gradual upgrade starting from upgradeFromVersion + log.info("Upgrading ThingsBoard from version 1.2.3 to 1.3.0 ..."); - databaseEntitiesUpgradeService.upgradeDatabase("1.2.3"); + databaseEntitiesUpgradeService.upgradeDatabase("1.2.3"); - case "1.3.0": //NOSONAR, Need to execute gradual upgrade starting from upgradeFromVersion - log.info("Upgrading ThingsBoard from version 1.3.0 to 1.3.1 ..."); + case "1.3.0": //NOSONAR, Need to execute gradual upgrade starting from upgradeFromVersion + log.info("Upgrading ThingsBoard from version 1.3.0 to 1.3.1 ..."); - databaseEntitiesUpgradeService.upgradeDatabase("1.3.0"); + databaseEntitiesUpgradeService.upgradeDatabase("1.3.0"); - case "1.3.1": //NOSONAR, Need to execute gradual upgrade starting from upgradeFromVersion - log.info("Upgrading ThingsBoard from version 1.3.1 to 1.4.0 ..."); + case "1.3.1": //NOSONAR, Need to execute gradual upgrade starting from upgradeFromVersion + log.info("Upgrading ThingsBoard from version 1.3.1 to 1.4.0 ..."); - databaseEntitiesUpgradeService.upgradeDatabase("1.3.1"); + databaseEntitiesUpgradeService.upgradeDatabase("1.3.1"); - case "1.4.0": - log.info("Upgrading ThingsBoard from version 1.4.0 to 2.0.0 ..."); + case "1.4.0": + log.info("Upgrading ThingsBoard from version 1.4.0 to 2.0.0 ..."); - databaseEntitiesUpgradeService.upgradeDatabase("1.4.0"); + databaseEntitiesUpgradeService.upgradeDatabase("1.4.0"); - dataUpdateService.updateData("1.4.0"); + dataUpdateService.updateData("1.4.0"); - case "2.0.0": - log.info("Upgrading ThingsBoard from version 2.0.0 to 2.1.1 ..."); + case "2.0.0": + log.info("Upgrading ThingsBoard from version 2.0.0 to 2.1.1 ..."); - databaseEntitiesUpgradeService.upgradeDatabase("2.0.0"); + databaseEntitiesUpgradeService.upgradeDatabase("2.0.0"); - case "2.1.1": - log.info("Upgrading ThingsBoard from version 2.1.1 to 2.1.2 ..."); + case "2.1.1": + log.info("Upgrading ThingsBoard from version 2.1.1 to 2.1.2 ..."); - databaseEntitiesUpgradeService.upgradeDatabase("2.1.1"); - case "2.1.3": - log.info("Upgrading ThingsBoard from version 2.1.3 to 2.2.0 ..."); + databaseEntitiesUpgradeService.upgradeDatabase("2.1.1"); + case "2.1.3": + log.info("Upgrading ThingsBoard from version 2.1.3 to 2.2.0 ..."); - databaseEntitiesUpgradeService.upgradeDatabase("2.1.3"); + databaseEntitiesUpgradeService.upgradeDatabase("2.1.3"); - case "2.3.0": - log.info("Upgrading ThingsBoard from version 2.3.0 to 2.3.1 ..."); + case "2.3.0": + log.info("Upgrading ThingsBoard from version 2.3.0 to 2.3.1 ..."); - databaseEntitiesUpgradeService.upgradeDatabase("2.3.0"); + databaseEntitiesUpgradeService.upgradeDatabase("2.3.0"); - case "2.3.1": - log.info("Upgrading ThingsBoard from version 2.3.1 to 2.4.0 ..."); + case "2.3.1": + log.info("Upgrading ThingsBoard from version 2.3.1 to 2.4.0 ..."); - databaseEntitiesUpgradeService.upgradeDatabase("2.3.1"); + databaseEntitiesUpgradeService.upgradeDatabase("2.3.1"); - case "2.4.0": - log.info("Upgrading ThingsBoard from version 2.4.0 to 2.4.1 ..."); + case "2.4.0": + log.info("Upgrading ThingsBoard from version 2.4.0 to 2.4.1 ..."); - case "2.4.1": - log.info("Upgrading ThingsBoard from version 2.4.1 to 2.4.2 ..."); + case "2.4.1": + log.info("Upgrading ThingsBoard from version 2.4.1 to 2.4.2 ..."); - databaseEntitiesUpgradeService.upgradeDatabase("2.4.1"); - case "2.4.2": - log.info("Upgrading ThingsBoard from version 2.4.2 to 2.4.3 ..."); + databaseEntitiesUpgradeService.upgradeDatabase("2.4.1"); + case "2.4.2": + log.info("Upgrading ThingsBoard from version 2.4.2 to 2.4.3 ..."); - databaseEntitiesUpgradeService.upgradeDatabase("2.4.2"); + databaseEntitiesUpgradeService.upgradeDatabase("2.4.2"); - case "2.4.3": - log.info("Upgrading ThingsBoard from version 2.4.3 to 2.5 ..."); + case "2.4.3": + log.info("Upgrading ThingsBoard from version 2.4.3 to 2.5 ..."); - if (databaseTsUpgradeService != null) { - databaseTsUpgradeService.upgradeDatabase("2.4.3"); - } - databaseEntitiesUpgradeService.upgradeDatabase("2.4.3"); + if (databaseTsUpgradeService != null) { + databaseTsUpgradeService.upgradeDatabase("2.4.3"); + } + databaseEntitiesUpgradeService.upgradeDatabase("2.4.3"); - log.info("Updating system data..."); + log.info("Updating system data..."); - systemDataLoaderService.deleteSystemWidgetBundle("charts"); - systemDataLoaderService.deleteSystemWidgetBundle("cards"); - systemDataLoaderService.deleteSystemWidgetBundle("maps"); - systemDataLoaderService.deleteSystemWidgetBundle("analogue_gauges"); - systemDataLoaderService.deleteSystemWidgetBundle("digital_gauges"); - systemDataLoaderService.deleteSystemWidgetBundle("gpio_widgets"); - systemDataLoaderService.deleteSystemWidgetBundle("alarm_widgets"); - systemDataLoaderService.deleteSystemWidgetBundle("control_widgets"); - systemDataLoaderService.deleteSystemWidgetBundle("maps_v2"); - systemDataLoaderService.deleteSystemWidgetBundle("gateway_widgets"); - systemDataLoaderService.deleteSystemWidgetBundle("input_widgets"); - systemDataLoaderService.deleteSystemWidgetBundle("date"); - systemDataLoaderService.deleteSystemWidgetBundle("entity_admin_widgets"); - - systemDataLoaderService.loadSystemWidgets(); - break; - default: - throw new RuntimeException("Unable to upgrade ThingsBoard, unsupported fromVersion: " + upgradeFromVersion); + systemDataLoaderService.updateSystemWidgets(); + break; + default: + throw new RuntimeException("Unable to upgrade ThingsBoard, unsupported fromVersion: " + upgradeFromVersion); + } } log.info("Upgrade finished successfully!"); diff --git a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java index fd907714fa..4f35509429 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java @@ -168,6 +168,24 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { installScripts.loadSystemWidgets(); } + @Override + public void updateSystemWidgets() throws Exception { + this.deleteSystemWidgetBundle("charts"); + this.deleteSystemWidgetBundle("cards"); + this.deleteSystemWidgetBundle("maps"); + this.deleteSystemWidgetBundle("analogue_gauges"); + this.deleteSystemWidgetBundle("digital_gauges"); + this.deleteSystemWidgetBundle("gpio_widgets"); + this.deleteSystemWidgetBundle("alarm_widgets"); + this.deleteSystemWidgetBundle("control_widgets"); + this.deleteSystemWidgetBundle("maps_v2"); + this.deleteSystemWidgetBundle("gateway_widgets"); + this.deleteSystemWidgetBundle("input_widgets"); + this.deleteSystemWidgetBundle("date"); + this.deleteSystemWidgetBundle("entity_admin_widgets"); + installScripts.loadSystemWidgets(); + } + private User createUser(Authority authority, TenantId tenantId, CustomerId customerId, diff --git a/application/src/main/java/org/thingsboard/server/service/install/SystemDataLoaderService.java b/application/src/main/java/org/thingsboard/server/service/install/SystemDataLoaderService.java index e386e84132..76e65deaa4 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SystemDataLoaderService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SystemDataLoaderService.java @@ -23,6 +23,8 @@ public interface SystemDataLoaderService { void loadSystemWidgets() throws Exception; + void updateSystemWidgets() throws Exception; + void loadDemoData() throws Exception; void deleteSystemWidgetBundle(String bundleAlias) throws Exception; diff --git a/application/src/main/java/org/thingsboard/server/service/install/migrate/CassandraEntitiesToSqlMigrateService.java b/application/src/main/java/org/thingsboard/server/service/install/migrate/CassandraEntitiesToSqlMigrateService.java new file mode 100644 index 0000000000..e5d7844ba4 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/install/migrate/CassandraEntitiesToSqlMigrateService.java @@ -0,0 +1,253 @@ +package org.thingsboard.server.service.install.migrate; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.util.SqlDao; +import org.thingsboard.server.service.install.EntityDatabaseSchemaService; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Arrays; +import java.util.List; + +import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.bigintColumn; +import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.booleanColumn; +import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.doubleColumn; +import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.enumToIntColumn; +import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.idColumn; +import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.stringColumn; + +@Service +@Profile("install") +@SqlDao +@Slf4j +public class CassandraEntitiesToSqlMigrateService implements EntitiesMigrateService { + + @Autowired + private EntityDatabaseSchemaService entityDatabaseSchemaService; + + @Autowired + protected CassandraCluster cluster; + + @Value("${spring.datasource.url}") + protected String dbUrl; + + @Value("${spring.datasource.username}") + protected String dbUserName; + + @Value("${spring.datasource.password}") + protected String dbPassword; + + + @Override + public void migrate() throws Exception { + log.info("Performing migration of entities data from cassandra to SQL database ..."); + entityDatabaseSchemaService.createDatabaseSchema(); + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { + conn.setAutoCommit(true); + for (CassandraToSqlTable table: tables) { + table.migrateToSql(cluster.getSession(), conn); + } + } catch (Exception e) { + log.error("Unexpected error during ThingsBoard entities data migration!", e); + throw e; + } + } + + private static List tables = Arrays.asList( + new CassandraToSqlTable("admin_settings", + idColumn("id"), + stringColumn("key"), + stringColumn("json_value")), + new CassandraToSqlTable("alarm", + idColumn("id"), + idColumn("tenant_id"), + stringColumn("type"), + idColumn("originator_id"), + enumToIntColumn("originator_type", EntityType.class), + stringColumn("severity"), + stringColumn("status"), + bigintColumn("start_ts"), + bigintColumn("end_ts"), + bigintColumn("ack_ts"), + bigintColumn("clear_ts"), + stringColumn("details", "additional_info"), + booleanColumn("propagate"), + stringColumn("propagate_relation_types")), + new CassandraToSqlTable("asset", + idColumn("id"), + idColumn("tenant_id"), + idColumn("customer_id"), + stringColumn("name"), + stringColumn("type"), + stringColumn("label"), + stringColumn("search_text"), + stringColumn("additional_info")), + new CassandraToSqlTable("audit_log_by_tenant_id", "audit_log", + idColumn("id"), + idColumn("tenant_id"), + idColumn("customer_id"), + idColumn("entity_id"), + stringColumn("entity_type"), + stringColumn("entity_name"), + idColumn("user_id"), + stringColumn("user_name"), + stringColumn("action_type"), + stringColumn("action_data"), + stringColumn("action_status"), + stringColumn("action_failure_details")), + new CassandraToSqlTable("attributes_kv_cf", "attribute_kv", + idColumn("entity_id"), + stringColumn("entity_type"), + stringColumn("attribute_type"), + stringColumn("attribute_key"), + booleanColumn("bool_v"), + stringColumn("str_v"), + bigintColumn("long_v"), + doubleColumn("dbl_v"), + stringColumn("json_v"), + bigintColumn("last_update_ts")), + new CassandraToSqlTable("component_descriptor", + idColumn("id"), + stringColumn("type"), + stringColumn("scope"), + stringColumn("name"), + stringColumn("search_text"), + stringColumn("clazz"), + stringColumn("configuration_descriptor"), + stringColumn("actions")), + new CassandraToSqlTable("customer", + idColumn("id"), + idColumn("tenant_id"), + stringColumn("title"), + stringColumn("search_text"), + stringColumn("country"), + stringColumn("state"), + stringColumn("city"), + stringColumn("address"), + stringColumn("address2"), + stringColumn("zip"), + stringColumn("phone"), + stringColumn("email"), + stringColumn("additional_info")), + new CassandraToSqlTable("dashboard", + idColumn("id"), + idColumn("tenant_id"), + stringColumn("title"), + stringColumn("search_text"), + stringColumn("assigned_customers"), + stringColumn("configuration")), + new CassandraToSqlTable("device", + idColumn("id"), + idColumn("tenant_id"), + idColumn("customer_id"), + stringColumn("name"), + stringColumn("type"), + stringColumn("label"), + stringColumn("search_text"), + stringColumn("additional_info")), + new CassandraToSqlTable("device_credentials", + idColumn("id"), + idColumn("device_id"), + stringColumn("credentials_type"), + stringColumn("credentials_id"), + stringColumn("credentials_value")), + new CassandraToSqlTable("event", + idColumn("id"), + idColumn("tenant_id"), + idColumn("entity_id"), + stringColumn("entity_type"), + stringColumn("event_type"), + stringColumn("event_uid"), + stringColumn("body")), + new CassandraToSqlTable("relation", + idColumn("from_id"), + stringColumn("from_type"), + idColumn("to_id"), + stringColumn("to_type"), + stringColumn("relation_type_group"), + stringColumn("relation_type"), + stringColumn("additional_info")), + new CassandraToSqlTable("user", "tb_user", + idColumn("id"), + idColumn("tenant_id"), + idColumn("customer_id"), + stringColumn("email"), + stringColumn("search_text"), + stringColumn("authority"), + stringColumn("first_name"), + stringColumn("last_name"), + stringColumn("additional_info")), + new CassandraToSqlTable("tenant", + idColumn("id"), + stringColumn("title"), + stringColumn("search_text"), + stringColumn("region"), + stringColumn("country"), + stringColumn("state"), + stringColumn("city"), + stringColumn("address"), + stringColumn("address2"), + stringColumn("zip"), + stringColumn("phone"), + stringColumn("email"), + stringColumn("additional_info")), + new CassandraToSqlTable("user_credentials", + idColumn("id"), + idColumn("user_id"), + booleanColumn("enabled"), + stringColumn("password"), + stringColumn("activate_token"), + stringColumn("reset_token")), + new CassandraToSqlTable("widget_type", + idColumn("id"), + idColumn("tenant_id"), + stringColumn("bundle_alias"), + stringColumn("alias"), + stringColumn("name"), + stringColumn("descriptor")), + new CassandraToSqlTable("widgets_bundle", + idColumn("id"), + idColumn("tenant_id"), + stringColumn("alias"), + stringColumn("title"), + stringColumn("search_text")), + new CassandraToSqlTable("rule_chain", + idColumn("id"), + idColumn("tenant_id"), + stringColumn("name"), + stringColumn("search_text"), + idColumn("first_rule_node_id"), + booleanColumn("root"), + booleanColumn("debug_mode"), + stringColumn("configuration"), + stringColumn("additional_info")), + new CassandraToSqlTable("rule_node", + idColumn("id"), + idColumn("rule_chain_id"), + stringColumn("type"), + stringColumn("name"), + booleanColumn("debug_mode"), + stringColumn("search_text"), + stringColumn("configuration"), + stringColumn("additional_info")), + new CassandraToSqlTable("entity_view", + idColumn("id"), + idColumn("tenant_id"), + idColumn("customer_id"), + idColumn("entity_id"), + stringColumn("entity_type"), + stringColumn("name"), + stringColumn("type"), + stringColumn("keys"), + bigintColumn("start_ts"), + bigintColumn("end_ts"), + stringColumn("search_text"), + stringColumn("additional_info")) + ); +} diff --git a/application/src/main/java/org/thingsboard/server/service/install/migrate/CassandraToSqlColumn.java b/application/src/main/java/org/thingsboard/server/service/install/migrate/CassandraToSqlColumn.java new file mode 100644 index 0000000000..56d1d53833 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/install/migrate/CassandraToSqlColumn.java @@ -0,0 +1,159 @@ +package org.thingsboard.server.service.install.migrate; + +import com.datastax.driver.core.Row; +import lombok.Data; +import org.thingsboard.server.common.data.UUIDConverter; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; + +@Data +public class CassandraToSqlColumn { + + private String cassandraColumnName; + private String sqlColumnName; + private CassandraToSqlColumnType type; + private int sqlType; + private Class enumClass; + + public static CassandraToSqlColumn idColumn(String name) { + return new CassandraToSqlColumn(name, CassandraToSqlColumnType.ID); + } + + public static CassandraToSqlColumn stringColumn(String name) { + return new CassandraToSqlColumn(name, CassandraToSqlColumnType.STRING); + } + + public static CassandraToSqlColumn stringColumn(String cassandraColumnName, String sqlColumnName) { + return new CassandraToSqlColumn(cassandraColumnName, sqlColumnName); + } + + public static CassandraToSqlColumn bigintColumn(String name) { + return new CassandraToSqlColumn(name, CassandraToSqlColumnType.BIGINT); + } + + public static CassandraToSqlColumn doubleColumn(String name) { + return new CassandraToSqlColumn(name, CassandraToSqlColumnType.DOUBLE); + } + + public static CassandraToSqlColumn booleanColumn(String name) { + return new CassandraToSqlColumn(name, CassandraToSqlColumnType.BOOLEAN); + } + + public static CassandraToSqlColumn enumToIntColumn(String name, Class enumClass) { + return new CassandraToSqlColumn(name, CassandraToSqlColumnType.ENUM_TO_INT, enumClass); + } + + public CassandraToSqlColumn(String columnName) { + this(columnName, columnName, CassandraToSqlColumnType.STRING, null); + } + + public CassandraToSqlColumn(String columnName, CassandraToSqlColumnType type) { + this(columnName, columnName, type, null); + } + + public CassandraToSqlColumn(String columnName, CassandraToSqlColumnType type, Class enumClass) { + this(columnName, columnName, type, enumClass); + } + + public CassandraToSqlColumn(String cassandraColumnName, String sqlColumnName) { + this(cassandraColumnName, sqlColumnName, CassandraToSqlColumnType.STRING, null); + } + + public CassandraToSqlColumn(String cassandraColumnName, String sqlColumnName, CassandraToSqlColumnType type, + Class enumClass) { + this.cassandraColumnName = cassandraColumnName; + this.sqlColumnName = sqlColumnName; + this.type = type; + this.enumClass = enumClass; + switch (this.type) { + case ID: + case STRING: + this.sqlType = Types.VARCHAR; + break; + case DOUBLE: + this.sqlType = Types.DOUBLE; + break; + case INTEGER: + case ENUM_TO_INT: + this.sqlType = Types.INTEGER; + break; + case FLOAT: + this.sqlType = Types.FLOAT; + break; + case BIGINT: + this.sqlType = Types.BIGINT; + break; + case BOOLEAN: + this.sqlType = Types.BOOLEAN; + break; + } + } + + public void prepareColumnValue(Row row, PreparedStatement sqlInsertStatement, int index) throws SQLException { + String value = this.getColumnValue(row, index); + this.setColumnValue(sqlInsertStatement, index, value); + } + + private String getColumnValue(Row row, int index) { + if (row.isNull(index)) { + return null; + } else { + switch (this.type) { + case ID: + return UUIDConverter.fromTimeUUID(row.getUUID(index)); + case DOUBLE: + return Double.toString(row.getDouble(index)); + case INTEGER: + return Integer.toString(row.getInt(index)); + case FLOAT: + return Float.toString(row.getFloat(index)); + case BIGINT: + return Long.toString(row.getLong(index)); + case BOOLEAN: + return Boolean.toString(row.getBool(index)); + case STRING: + case ENUM_TO_INT: + default: + return row.getString(index); + } + } + } + + private void setColumnValue(PreparedStatement sqlInsertStatement, int index, String value) throws SQLException { + if (value == null) { + sqlInsertStatement.setNull(index, this.sqlType); + } else { + switch (this.type) { + case DOUBLE: + sqlInsertStatement.setDouble(index, Double.parseDouble(value)); + break; + case INTEGER: + sqlInsertStatement.setInt(index, Integer.parseInt(value)); + break; + case FLOAT: + sqlInsertStatement.setFloat(index, Float.parseFloat(value)); + break; + case BIGINT: + sqlInsertStatement.setLong(index, Long.parseLong(value)); + break; + case BOOLEAN: + sqlInsertStatement.setBoolean(index, Boolean.parseBoolean(value)); + break; + case ENUM_TO_INT: + Enum enumVal = Enum.valueOf(this.enumClass, value); + int intValue = enumVal.ordinal(); + sqlInsertStatement.setInt(index, intValue); + break; + case STRING: + case ID: + default: + sqlInsertStatement.setString(index, value); + break; + } + } + } + +} + diff --git a/application/src/main/java/org/thingsboard/server/service/install/migrate/CassandraToSqlColumnType.java b/application/src/main/java/org/thingsboard/server/service/install/migrate/CassandraToSqlColumnType.java new file mode 100644 index 0000000000..c20484a898 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/install/migrate/CassandraToSqlColumnType.java @@ -0,0 +1,12 @@ +package org.thingsboard.server.service.install.migrate; + +public enum CassandraToSqlColumnType { + ID, + DOUBLE, + INTEGER, + FLOAT, + BIGINT, + BOOLEAN, + STRING, + ENUM_TO_INT +} diff --git a/application/src/main/java/org/thingsboard/server/service/install/migrate/CassandraToSqlTable.java b/application/src/main/java/org/thingsboard/server/service/install/migrate/CassandraToSqlTable.java new file mode 100644 index 0000000000..fc2fcf78db --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/install/migrate/CassandraToSqlTable.java @@ -0,0 +1,99 @@ +package org.thingsboard.server.service.install.migrate; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.Statement; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +@Data +@Slf4j +public class CassandraToSqlTable { + + private String cassandraCf; + private String sqlTableName; + + private List columns; + + public CassandraToSqlTable(String tableName, CassandraToSqlColumn... columns) { + this(tableName, tableName, columns); + } + + public CassandraToSqlTable(String cassandraCf, String sqlTableName, CassandraToSqlColumn... columns) { + this.cassandraCf = cassandraCf; + this.sqlTableName = sqlTableName; + this.columns = Arrays.asList(columns); + } + + public void migrateToSql(Session session, Connection conn) throws SQLException { + log.info("Migrating data from cassandra '{}' Column Family to '{}' SQL table...", this.cassandraCf, this.sqlTableName); + PreparedStatement sqlInsertStatement = createSqlInsertStatement(conn); + Statement cassandraSelectStatement = createCassandraSelectStatement(); + cassandraSelectStatement.setFetchSize(100); + ResultSet rs = session.execute(cassandraSelectStatement); + Iterator iter = rs.iterator(); + int rowCounter = 0; + while (iter.hasNext()) { + Row row = iter.next(); + if (row != null) { + this.migrateRowToSql(row, sqlInsertStatement); + rowCounter++; + if (rowCounter % 100 == 0) { + sqlInsertStatement.executeBatch(); + log.info("{} records migrated so far...", rowCounter); + } + } + } + if (rowCounter % 100 > 0) { + sqlInsertStatement.executeBatch(); + } + sqlInsertStatement.close(); + log.info("{} total records migrated.", rowCounter); + log.info("Finished migration data from cassandra '{}' Column Family to '{}' SQL table.", this.cassandraCf, this.sqlTableName); + } + + private void migrateRowToSql(Row row, PreparedStatement sqlInsertStatement) throws SQLException { + for (int i=0; i