Cassandra 2.5.0 to SQL migration

This commit is contained in:
Igor Kulikov 2020-03-02 17:43:10 +02:00
parent e7d00f4b68
commit b490e4ef4d
8 changed files with 609 additions and 62 deletions

View File

@ -28,6 +28,7 @@ import org.thingsboard.server.service.install.DatabaseEntitiesUpgradeService;
import org.thingsboard.server.service.install.EntityDatabaseSchemaService; import org.thingsboard.server.service.install.EntityDatabaseSchemaService;
import org.thingsboard.server.service.install.SystemDataLoaderService; import org.thingsboard.server.service.install.SystemDataLoaderService;
import org.thingsboard.server.service.install.TsDatabaseSchemaService; import org.thingsboard.server.service.install.TsDatabaseSchemaService;
import org.thingsboard.server.service.install.migrate.EntitiesMigrateService;
import org.thingsboard.server.service.install.update.DataUpdateService; import org.thingsboard.server.service.install.update.DataUpdateService;
@Service @Service
@ -68,99 +69,95 @@ public class ThingsboardInstallService {
@Autowired @Autowired
private DataUpdateService dataUpdateService; private DataUpdateService dataUpdateService;
@Autowired(required = false)
private EntitiesMigrateService entitiesMigrateService;
public void performInstall() { public void performInstall() {
try { try {
if (isUpgrade) { if (isUpgrade) {
log.info("Starting ThingsBoard Upgrade from version {} ...", upgradeFromVersion); log.info("Starting ThingsBoard Upgrade from version {} ...", upgradeFromVersion);
switch (upgradeFromVersion) { if ("2.5.0-cassandra".equals(upgradeFromVersion)) {
case "1.2.3": //NOSONAR, Need to execute gradual upgrade starting from upgradeFromVersion log.info("Migrating ThingsBoard entities data from cassandra to SQL database ...");
log.info("Upgrading ThingsBoard from version 1.2.3 to 1.3.0 ..."); entitiesMigrateService.migrate();
log.info("Updating system data...");
systemDataLoaderService.updateSystemWidgets();
} else {
switch (upgradeFromVersion) {
case "1.2.3": //NOSONAR, Need to execute gradual upgrade starting from upgradeFromVersion
log.info("Upgrading ThingsBoard from version 1.2.3 to 1.3.0 ...");
databaseEntitiesUpgradeService.upgradeDatabase("1.2.3"); databaseEntitiesUpgradeService.upgradeDatabase("1.2.3");
case "1.3.0": //NOSONAR, Need to execute gradual upgrade starting from upgradeFromVersion case "1.3.0": //NOSONAR, Need to execute gradual upgrade starting from upgradeFromVersion
log.info("Upgrading ThingsBoard from version 1.3.0 to 1.3.1 ..."); log.info("Upgrading ThingsBoard from version 1.3.0 to 1.3.1 ...");
databaseEntitiesUpgradeService.upgradeDatabase("1.3.0"); databaseEntitiesUpgradeService.upgradeDatabase("1.3.0");
case "1.3.1": //NOSONAR, Need to execute gradual upgrade starting from upgradeFromVersion case "1.3.1": //NOSONAR, Need to execute gradual upgrade starting from upgradeFromVersion
log.info("Upgrading ThingsBoard from version 1.3.1 to 1.4.0 ..."); log.info("Upgrading ThingsBoard from version 1.3.1 to 1.4.0 ...");
databaseEntitiesUpgradeService.upgradeDatabase("1.3.1"); databaseEntitiesUpgradeService.upgradeDatabase("1.3.1");
case "1.4.0": case "1.4.0":
log.info("Upgrading ThingsBoard from version 1.4.0 to 2.0.0 ..."); log.info("Upgrading ThingsBoard from version 1.4.0 to 2.0.0 ...");
databaseEntitiesUpgradeService.upgradeDatabase("1.4.0"); databaseEntitiesUpgradeService.upgradeDatabase("1.4.0");
dataUpdateService.updateData("1.4.0"); dataUpdateService.updateData("1.4.0");
case "2.0.0": case "2.0.0":
log.info("Upgrading ThingsBoard from version 2.0.0 to 2.1.1 ..."); log.info("Upgrading ThingsBoard from version 2.0.0 to 2.1.1 ...");
databaseEntitiesUpgradeService.upgradeDatabase("2.0.0"); databaseEntitiesUpgradeService.upgradeDatabase("2.0.0");
case "2.1.1": case "2.1.1":
log.info("Upgrading ThingsBoard from version 2.1.1 to 2.1.2 ..."); log.info("Upgrading ThingsBoard from version 2.1.1 to 2.1.2 ...");
databaseEntitiesUpgradeService.upgradeDatabase("2.1.1"); databaseEntitiesUpgradeService.upgradeDatabase("2.1.1");
case "2.1.3": case "2.1.3":
log.info("Upgrading ThingsBoard from version 2.1.3 to 2.2.0 ..."); log.info("Upgrading ThingsBoard from version 2.1.3 to 2.2.0 ...");
databaseEntitiesUpgradeService.upgradeDatabase("2.1.3"); databaseEntitiesUpgradeService.upgradeDatabase("2.1.3");
case "2.3.0": case "2.3.0":
log.info("Upgrading ThingsBoard from version 2.3.0 to 2.3.1 ..."); log.info("Upgrading ThingsBoard from version 2.3.0 to 2.3.1 ...");
databaseEntitiesUpgradeService.upgradeDatabase("2.3.0"); databaseEntitiesUpgradeService.upgradeDatabase("2.3.0");
case "2.3.1": case "2.3.1":
log.info("Upgrading ThingsBoard from version 2.3.1 to 2.4.0 ..."); log.info("Upgrading ThingsBoard from version 2.3.1 to 2.4.0 ...");
databaseEntitiesUpgradeService.upgradeDatabase("2.3.1"); databaseEntitiesUpgradeService.upgradeDatabase("2.3.1");
case "2.4.0": case "2.4.0":
log.info("Upgrading ThingsBoard from version 2.4.0 to 2.4.1 ..."); log.info("Upgrading ThingsBoard from version 2.4.0 to 2.4.1 ...");
case "2.4.1": case "2.4.1":
log.info("Upgrading ThingsBoard from version 2.4.1 to 2.4.2 ..."); log.info("Upgrading ThingsBoard from version 2.4.1 to 2.4.2 ...");
databaseEntitiesUpgradeService.upgradeDatabase("2.4.1"); databaseEntitiesUpgradeService.upgradeDatabase("2.4.1");
case "2.4.2": case "2.4.2":
log.info("Upgrading ThingsBoard from version 2.4.2 to 2.4.3 ..."); log.info("Upgrading ThingsBoard from version 2.4.2 to 2.4.3 ...");
databaseEntitiesUpgradeService.upgradeDatabase("2.4.2"); databaseEntitiesUpgradeService.upgradeDatabase("2.4.2");
case "2.4.3": case "2.4.3":
log.info("Upgrading ThingsBoard from version 2.4.3 to 2.5 ..."); log.info("Upgrading ThingsBoard from version 2.4.3 to 2.5 ...");
if (databaseTsUpgradeService != null) { if (databaseTsUpgradeService != null) {
databaseTsUpgradeService.upgradeDatabase("2.4.3"); databaseTsUpgradeService.upgradeDatabase("2.4.3");
} }
databaseEntitiesUpgradeService.upgradeDatabase("2.4.3"); databaseEntitiesUpgradeService.upgradeDatabase("2.4.3");
log.info("Updating system data..."); log.info("Updating system data...");
systemDataLoaderService.deleteSystemWidgetBundle("charts"); systemDataLoaderService.updateSystemWidgets();
systemDataLoaderService.deleteSystemWidgetBundle("cards"); break;
systemDataLoaderService.deleteSystemWidgetBundle("maps"); default:
systemDataLoaderService.deleteSystemWidgetBundle("analogue_gauges"); throw new RuntimeException("Unable to upgrade ThingsBoard, unsupported fromVersion: " + upgradeFromVersion);
systemDataLoaderService.deleteSystemWidgetBundle("digital_gauges");
systemDataLoaderService.deleteSystemWidgetBundle("gpio_widgets");
systemDataLoaderService.deleteSystemWidgetBundle("alarm_widgets");
systemDataLoaderService.deleteSystemWidgetBundle("control_widgets");
systemDataLoaderService.deleteSystemWidgetBundle("maps_v2");
systemDataLoaderService.deleteSystemWidgetBundle("gateway_widgets");
systemDataLoaderService.deleteSystemWidgetBundle("input_widgets");
systemDataLoaderService.deleteSystemWidgetBundle("date");
systemDataLoaderService.deleteSystemWidgetBundle("entity_admin_widgets");
systemDataLoaderService.loadSystemWidgets();
break;
default:
throw new RuntimeException("Unable to upgrade ThingsBoard, unsupported fromVersion: " + upgradeFromVersion);
}
} }
log.info("Upgrade finished successfully!"); log.info("Upgrade finished successfully!");

View File

@ -168,6 +168,24 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
installScripts.loadSystemWidgets(); installScripts.loadSystemWidgets();
} }
@Override
public void updateSystemWidgets() throws Exception {
this.deleteSystemWidgetBundle("charts");
this.deleteSystemWidgetBundle("cards");
this.deleteSystemWidgetBundle("maps");
this.deleteSystemWidgetBundle("analogue_gauges");
this.deleteSystemWidgetBundle("digital_gauges");
this.deleteSystemWidgetBundle("gpio_widgets");
this.deleteSystemWidgetBundle("alarm_widgets");
this.deleteSystemWidgetBundle("control_widgets");
this.deleteSystemWidgetBundle("maps_v2");
this.deleteSystemWidgetBundle("gateway_widgets");
this.deleteSystemWidgetBundle("input_widgets");
this.deleteSystemWidgetBundle("date");
this.deleteSystemWidgetBundle("entity_admin_widgets");
installScripts.loadSystemWidgets();
}
private User createUser(Authority authority, private User createUser(Authority authority,
TenantId tenantId, TenantId tenantId,
CustomerId customerId, CustomerId customerId,

View File

@ -23,6 +23,8 @@ public interface SystemDataLoaderService {
void loadSystemWidgets() throws Exception; void loadSystemWidgets() throws Exception;
void updateSystemWidgets() throws Exception;
void loadDemoData() throws Exception; void loadDemoData() throws Exception;
void deleteSystemWidgetBundle(String bundleAlias) throws Exception; void deleteSystemWidgetBundle(String bundleAlias) throws Exception;

View File

@ -0,0 +1,253 @@
package org.thingsboard.server.service.install.migrate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.util.SqlDao;
import org.thingsboard.server.service.install.EntityDatabaseSchemaService;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Arrays;
import java.util.List;
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.bigintColumn;
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.booleanColumn;
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.doubleColumn;
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.enumToIntColumn;
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.idColumn;
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.stringColumn;
@Service
@Profile("install")
@SqlDao
@Slf4j
public class CassandraEntitiesToSqlMigrateService implements EntitiesMigrateService {
@Autowired
private EntityDatabaseSchemaService entityDatabaseSchemaService;
@Autowired
protected CassandraCluster cluster;
@Value("${spring.datasource.url}")
protected String dbUrl;
@Value("${spring.datasource.username}")
protected String dbUserName;
@Value("${spring.datasource.password}")
protected String dbPassword;
@Override
public void migrate() throws Exception {
log.info("Performing migration of entities data from cassandra to SQL database ...");
entityDatabaseSchemaService.createDatabaseSchema();
try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
conn.setAutoCommit(true);
for (CassandraToSqlTable table: tables) {
table.migrateToSql(cluster.getSession(), conn);
}
} catch (Exception e) {
log.error("Unexpected error during ThingsBoard entities data migration!", e);
throw e;
}
}
private static List<CassandraToSqlTable> tables = Arrays.asList(
new CassandraToSqlTable("admin_settings",
idColumn("id"),
stringColumn("key"),
stringColumn("json_value")),
new CassandraToSqlTable("alarm",
idColumn("id"),
idColumn("tenant_id"),
stringColumn("type"),
idColumn("originator_id"),
enumToIntColumn("originator_type", EntityType.class),
stringColumn("severity"),
stringColumn("status"),
bigintColumn("start_ts"),
bigintColumn("end_ts"),
bigintColumn("ack_ts"),
bigintColumn("clear_ts"),
stringColumn("details", "additional_info"),
booleanColumn("propagate"),
stringColumn("propagate_relation_types")),
new CassandraToSqlTable("asset",
idColumn("id"),
idColumn("tenant_id"),
idColumn("customer_id"),
stringColumn("name"),
stringColumn("type"),
stringColumn("label"),
stringColumn("search_text"),
stringColumn("additional_info")),
new CassandraToSqlTable("audit_log_by_tenant_id", "audit_log",
idColumn("id"),
idColumn("tenant_id"),
idColumn("customer_id"),
idColumn("entity_id"),
stringColumn("entity_type"),
stringColumn("entity_name"),
idColumn("user_id"),
stringColumn("user_name"),
stringColumn("action_type"),
stringColumn("action_data"),
stringColumn("action_status"),
stringColumn("action_failure_details")),
new CassandraToSqlTable("attributes_kv_cf", "attribute_kv",
idColumn("entity_id"),
stringColumn("entity_type"),
stringColumn("attribute_type"),
stringColumn("attribute_key"),
booleanColumn("bool_v"),
stringColumn("str_v"),
bigintColumn("long_v"),
doubleColumn("dbl_v"),
stringColumn("json_v"),
bigintColumn("last_update_ts")),
new CassandraToSqlTable("component_descriptor",
idColumn("id"),
stringColumn("type"),
stringColumn("scope"),
stringColumn("name"),
stringColumn("search_text"),
stringColumn("clazz"),
stringColumn("configuration_descriptor"),
stringColumn("actions")),
new CassandraToSqlTable("customer",
idColumn("id"),
idColumn("tenant_id"),
stringColumn("title"),
stringColumn("search_text"),
stringColumn("country"),
stringColumn("state"),
stringColumn("city"),
stringColumn("address"),
stringColumn("address2"),
stringColumn("zip"),
stringColumn("phone"),
stringColumn("email"),
stringColumn("additional_info")),
new CassandraToSqlTable("dashboard",
idColumn("id"),
idColumn("tenant_id"),
stringColumn("title"),
stringColumn("search_text"),
stringColumn("assigned_customers"),
stringColumn("configuration")),
new CassandraToSqlTable("device",
idColumn("id"),
idColumn("tenant_id"),
idColumn("customer_id"),
stringColumn("name"),
stringColumn("type"),
stringColumn("label"),
stringColumn("search_text"),
stringColumn("additional_info")),
new CassandraToSqlTable("device_credentials",
idColumn("id"),
idColumn("device_id"),
stringColumn("credentials_type"),
stringColumn("credentials_id"),
stringColumn("credentials_value")),
new CassandraToSqlTable("event",
idColumn("id"),
idColumn("tenant_id"),
idColumn("entity_id"),
stringColumn("entity_type"),
stringColumn("event_type"),
stringColumn("event_uid"),
stringColumn("body")),
new CassandraToSqlTable("relation",
idColumn("from_id"),
stringColumn("from_type"),
idColumn("to_id"),
stringColumn("to_type"),
stringColumn("relation_type_group"),
stringColumn("relation_type"),
stringColumn("additional_info")),
new CassandraToSqlTable("user", "tb_user",
idColumn("id"),
idColumn("tenant_id"),
idColumn("customer_id"),
stringColumn("email"),
stringColumn("search_text"),
stringColumn("authority"),
stringColumn("first_name"),
stringColumn("last_name"),
stringColumn("additional_info")),
new CassandraToSqlTable("tenant",
idColumn("id"),
stringColumn("title"),
stringColumn("search_text"),
stringColumn("region"),
stringColumn("country"),
stringColumn("state"),
stringColumn("city"),
stringColumn("address"),
stringColumn("address2"),
stringColumn("zip"),
stringColumn("phone"),
stringColumn("email"),
stringColumn("additional_info")),
new CassandraToSqlTable("user_credentials",
idColumn("id"),
idColumn("user_id"),
booleanColumn("enabled"),
stringColumn("password"),
stringColumn("activate_token"),
stringColumn("reset_token")),
new CassandraToSqlTable("widget_type",
idColumn("id"),
idColumn("tenant_id"),
stringColumn("bundle_alias"),
stringColumn("alias"),
stringColumn("name"),
stringColumn("descriptor")),
new CassandraToSqlTable("widgets_bundle",
idColumn("id"),
idColumn("tenant_id"),
stringColumn("alias"),
stringColumn("title"),
stringColumn("search_text")),
new CassandraToSqlTable("rule_chain",
idColumn("id"),
idColumn("tenant_id"),
stringColumn("name"),
stringColumn("search_text"),
idColumn("first_rule_node_id"),
booleanColumn("root"),
booleanColumn("debug_mode"),
stringColumn("configuration"),
stringColumn("additional_info")),
new CassandraToSqlTable("rule_node",
idColumn("id"),
idColumn("rule_chain_id"),
stringColumn("type"),
stringColumn("name"),
booleanColumn("debug_mode"),
stringColumn("search_text"),
stringColumn("configuration"),
stringColumn("additional_info")),
new CassandraToSqlTable("entity_view",
idColumn("id"),
idColumn("tenant_id"),
idColumn("customer_id"),
idColumn("entity_id"),
stringColumn("entity_type"),
stringColumn("name"),
stringColumn("type"),
stringColumn("keys"),
bigintColumn("start_ts"),
bigintColumn("end_ts"),
stringColumn("search_text"),
stringColumn("additional_info"))
);
}

View File

@ -0,0 +1,159 @@
package org.thingsboard.server.service.install.migrate;
import com.datastax.driver.core.Row;
import lombok.Data;
import org.thingsboard.server.common.data.UUIDConverter;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
@Data
public class CassandraToSqlColumn {
private String cassandraColumnName;
private String sqlColumnName;
private CassandraToSqlColumnType type;
private int sqlType;
private Class<? extends Enum> enumClass;
public static CassandraToSqlColumn idColumn(String name) {
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.ID);
}
public static CassandraToSqlColumn stringColumn(String name) {
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.STRING);
}
public static CassandraToSqlColumn stringColumn(String cassandraColumnName, String sqlColumnName) {
return new CassandraToSqlColumn(cassandraColumnName, sqlColumnName);
}
public static CassandraToSqlColumn bigintColumn(String name) {
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.BIGINT);
}
public static CassandraToSqlColumn doubleColumn(String name) {
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.DOUBLE);
}
public static CassandraToSqlColumn booleanColumn(String name) {
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.BOOLEAN);
}
public static CassandraToSqlColumn enumToIntColumn(String name, Class<? extends Enum> enumClass) {
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.ENUM_TO_INT, enumClass);
}
public CassandraToSqlColumn(String columnName) {
this(columnName, columnName, CassandraToSqlColumnType.STRING, null);
}
public CassandraToSqlColumn(String columnName, CassandraToSqlColumnType type) {
this(columnName, columnName, type, null);
}
public CassandraToSqlColumn(String columnName, CassandraToSqlColumnType type, Class<? extends Enum> enumClass) {
this(columnName, columnName, type, enumClass);
}
public CassandraToSqlColumn(String cassandraColumnName, String sqlColumnName) {
this(cassandraColumnName, sqlColumnName, CassandraToSqlColumnType.STRING, null);
}
public CassandraToSqlColumn(String cassandraColumnName, String sqlColumnName, CassandraToSqlColumnType type,
Class<? extends Enum> enumClass) {
this.cassandraColumnName = cassandraColumnName;
this.sqlColumnName = sqlColumnName;
this.type = type;
this.enumClass = enumClass;
switch (this.type) {
case ID:
case STRING:
this.sqlType = Types.VARCHAR;
break;
case DOUBLE:
this.sqlType = Types.DOUBLE;
break;
case INTEGER:
case ENUM_TO_INT:
this.sqlType = Types.INTEGER;
break;
case FLOAT:
this.sqlType = Types.FLOAT;
break;
case BIGINT:
this.sqlType = Types.BIGINT;
break;
case BOOLEAN:
this.sqlType = Types.BOOLEAN;
break;
}
}
public void prepareColumnValue(Row row, PreparedStatement sqlInsertStatement, int index) throws SQLException {
String value = this.getColumnValue(row, index);
this.setColumnValue(sqlInsertStatement, index, value);
}
private String getColumnValue(Row row, int index) {
if (row.isNull(index)) {
return null;
} else {
switch (this.type) {
case ID:
return UUIDConverter.fromTimeUUID(row.getUUID(index));
case DOUBLE:
return Double.toString(row.getDouble(index));
case INTEGER:
return Integer.toString(row.getInt(index));
case FLOAT:
return Float.toString(row.getFloat(index));
case BIGINT:
return Long.toString(row.getLong(index));
case BOOLEAN:
return Boolean.toString(row.getBool(index));
case STRING:
case ENUM_TO_INT:
default:
return row.getString(index);
}
}
}
private void setColumnValue(PreparedStatement sqlInsertStatement, int index, String value) throws SQLException {
if (value == null) {
sqlInsertStatement.setNull(index, this.sqlType);
} else {
switch (this.type) {
case DOUBLE:
sqlInsertStatement.setDouble(index, Double.parseDouble(value));
break;
case INTEGER:
sqlInsertStatement.setInt(index, Integer.parseInt(value));
break;
case FLOAT:
sqlInsertStatement.setFloat(index, Float.parseFloat(value));
break;
case BIGINT:
sqlInsertStatement.setLong(index, Long.parseLong(value));
break;
case BOOLEAN:
sqlInsertStatement.setBoolean(index, Boolean.parseBoolean(value));
break;
case ENUM_TO_INT:
Enum enumVal = Enum.valueOf(this.enumClass, value);
int intValue = enumVal.ordinal();
sqlInsertStatement.setInt(index, intValue);
break;
case STRING:
case ID:
default:
sqlInsertStatement.setString(index, value);
break;
}
}
}
}

View File

@ -0,0 +1,12 @@
package org.thingsboard.server.service.install.migrate;
public enum CassandraToSqlColumnType {
ID,
DOUBLE,
INTEGER,
FLOAT,
BIGINT,
BOOLEAN,
STRING,
ENUM_TO_INT
}

View File

@ -0,0 +1,99 @@
package org.thingsboard.server.service.install.migrate;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@Data
@Slf4j
public class CassandraToSqlTable {
private String cassandraCf;
private String sqlTableName;
private List<CassandraToSqlColumn> columns;
public CassandraToSqlTable(String tableName, CassandraToSqlColumn... columns) {
this(tableName, tableName, columns);
}
public CassandraToSqlTable(String cassandraCf, String sqlTableName, CassandraToSqlColumn... columns) {
this.cassandraCf = cassandraCf;
this.sqlTableName = sqlTableName;
this.columns = Arrays.asList(columns);
}
public void migrateToSql(Session session, Connection conn) throws SQLException {
log.info("Migrating data from cassandra '{}' Column Family to '{}' SQL table...", this.cassandraCf, this.sqlTableName);
PreparedStatement sqlInsertStatement = createSqlInsertStatement(conn);
Statement cassandraSelectStatement = createCassandraSelectStatement();
cassandraSelectStatement.setFetchSize(100);
ResultSet rs = session.execute(cassandraSelectStatement);
Iterator<Row> iter = rs.iterator();
int rowCounter = 0;
while (iter.hasNext()) {
Row row = iter.next();
if (row != null) {
this.migrateRowToSql(row, sqlInsertStatement);
rowCounter++;
if (rowCounter % 100 == 0) {
sqlInsertStatement.executeBatch();
log.info("{} records migrated so far...", rowCounter);
}
}
}
if (rowCounter % 100 > 0) {
sqlInsertStatement.executeBatch();
}
sqlInsertStatement.close();
log.info("{} total records migrated.", rowCounter);
log.info("Finished migration data from cassandra '{}' Column Family to '{}' SQL table.", this.cassandraCf, this.sqlTableName);
}
private void migrateRowToSql(Row row, PreparedStatement sqlInsertStatement) throws SQLException {
for (int i=0; i<this.columns.size();i++) {
CassandraToSqlColumn column = this.columns.get(i);
column.prepareColumnValue(row, sqlInsertStatement, i);
}
sqlInsertStatement.addBatch();
}
private Statement createCassandraSelectStatement() {
StringBuilder selectStatementBuilder = new StringBuilder();
selectStatementBuilder.append("SELECT ");
for (CassandraToSqlColumn column : columns) {
selectStatementBuilder.append(column.getCassandraColumnName()).append(",");
}
selectStatementBuilder.deleteCharAt(selectStatementBuilder.length() - 1);
selectStatementBuilder.append(" FROM ").append(cassandraCf);
return new SimpleStatement(selectStatementBuilder.toString());
}
private PreparedStatement createSqlInsertStatement(Connection conn) throws SQLException {
StringBuilder insertStatementBuilder = new StringBuilder();
insertStatementBuilder.append("INSERT INTO ").append(this.sqlTableName).append(" (");
for (CassandraToSqlColumn column : columns) {
insertStatementBuilder.append(column.getSqlColumnName()).append(",");
}
insertStatementBuilder.deleteCharAt(insertStatementBuilder.length() - 1);
insertStatementBuilder.append(") VALUES (");
for (CassandraToSqlColumn ignored : columns) {
insertStatementBuilder.append("?").append(",");
}
insertStatementBuilder.deleteCharAt(insertStatementBuilder.length() - 1);
insertStatementBuilder.append(")");
return conn.prepareStatement(insertStatementBuilder.toString());
}
}

View File

@ -0,0 +1,7 @@
package org.thingsboard.server.service.install.migrate;
public interface EntitiesMigrateService {
void migrate() throws Exception;
}