Improve CSV data dump.
This commit is contained in:
parent
063a1093e0
commit
9944439539
@ -169,7 +169,7 @@ public class CassandraDatabaseUpgradeService implements DatabaseUpgradeService {
|
|||||||
Path dashboardsDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), DASHBOARD,
|
Path dashboardsDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), DASHBOARD,
|
||||||
new String[]{ID, TENANT_ID, CUSTOMER_ID, TITLE, SEARCH_TEXT, ASSIGNED_CUSTOMERS, CONFIGURATION},
|
new String[]{ID, TENANT_ID, CUSTOMER_ID, TITLE, SEARCH_TEXT, ASSIGNED_CUSTOMERS, CONFIGURATION},
|
||||||
new String[]{"", "", "", "", "", "", ""},
|
new String[]{"", "", "", "", "", "", ""},
|
||||||
"tb-dashboards");
|
"tb-dashboards", true);
|
||||||
log.info("Dashboards dumped.");
|
log.info("Dashboards dumped.");
|
||||||
|
|
||||||
|
|
||||||
@ -181,7 +181,7 @@ public class CassandraDatabaseUpgradeService implements DatabaseUpgradeService {
|
|||||||
log.info("Restoring dashboards ...");
|
log.info("Restoring dashboards ...");
|
||||||
if (dashboardsDump != null) {
|
if (dashboardsDump != null) {
|
||||||
CassandraDbHelper.loadCf(ks, cluster.getSession(), DASHBOARD,
|
CassandraDbHelper.loadCf(ks, cluster.getSession(), DASHBOARD,
|
||||||
new String[]{ID, TENANT_ID, TITLE, SEARCH_TEXT, CONFIGURATION}, dashboardsDump);
|
new String[]{ID, TENANT_ID, TITLE, SEARCH_TEXT, CONFIGURATION}, dashboardsDump, true);
|
||||||
DatabaseHelper.upgradeTo40_assignDashboards(dashboardsDump, dashboardService, false);
|
DatabaseHelper.upgradeTo40_assignDashboards(dashboardsDump, dashboardService, false);
|
||||||
Files.deleteIfExists(dashboardsDump);
|
Files.deleteIfExists(dashboardsDump);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -79,7 +79,7 @@ public class SqlDatabaseUpgradeService implements DatabaseUpgradeService {
|
|||||||
Path dashboardsDump = SqlDbHelper.dumpTableIfExists(conn, DASHBOARD,
|
Path dashboardsDump = SqlDbHelper.dumpTableIfExists(conn, DASHBOARD,
|
||||||
new String[]{ID, TENANT_ID, CUSTOMER_ID, TITLE, SEARCH_TEXT, ASSIGNED_CUSTOMERS, CONFIGURATION},
|
new String[]{ID, TENANT_ID, CUSTOMER_ID, TITLE, SEARCH_TEXT, ASSIGNED_CUSTOMERS, CONFIGURATION},
|
||||||
new String[]{"", "", "", "", "", "", ""},
|
new String[]{"", "", "", "", "", "", ""},
|
||||||
"tb-dashboards");
|
"tb-dashboards", true);
|
||||||
log.info("Dashboards dumped.");
|
log.info("Dashboards dumped.");
|
||||||
|
|
||||||
log.info("Updating schema ...");
|
log.info("Updating schema ...");
|
||||||
@ -91,7 +91,7 @@ public class SqlDatabaseUpgradeService implements DatabaseUpgradeService {
|
|||||||
log.info("Restoring dashboards ...");
|
log.info("Restoring dashboards ...");
|
||||||
if (dashboardsDump != null) {
|
if (dashboardsDump != null) {
|
||||||
SqlDbHelper.loadTable(conn, DASHBOARD,
|
SqlDbHelper.loadTable(conn, DASHBOARD,
|
||||||
new String[]{ID, TENANT_ID, TITLE, SEARCH_TEXT, CONFIGURATION}, dashboardsDump);
|
new String[]{ID, TENANT_ID, TITLE, SEARCH_TEXT, CONFIGURATION}, dashboardsDump, true);
|
||||||
DatabaseHelper.upgradeTo40_assignDashboards(dashboardsDump, dashboardService, true);
|
DatabaseHelper.upgradeTo40_assignDashboards(dashboardsDump, dashboardService, true);
|
||||||
Files.deleteIfExists(dashboardsDump);
|
Files.deleteIfExists(dashboardsDump);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,6 +17,7 @@
|
|||||||
package org.thingsboard.server.service.install.cql;
|
package org.thingsboard.server.service.install.cql;
|
||||||
|
|
||||||
import com.datastax.driver.core.*;
|
import com.datastax.driver.core.*;
|
||||||
|
import org.apache.commons.csv.CSVFormat;
|
||||||
import org.apache.commons.csv.CSVParser;
|
import org.apache.commons.csv.CSVParser;
|
||||||
import org.apache.commons.csv.CSVPrinter;
|
import org.apache.commons.csv.CSVPrinter;
|
||||||
import org.apache.commons.csv.CSVRecord;
|
import org.apache.commons.csv.CSVRecord;
|
||||||
@ -33,10 +34,19 @@ public class CassandraDbHelper {
|
|||||||
|
|
||||||
public static Path dumpCfIfExists(KeyspaceMetadata ks, Session session, String cfName,
|
public static Path dumpCfIfExists(KeyspaceMetadata ks, Session session, String cfName,
|
||||||
String[] columns, String[] defaultValues, String dumpPrefix) throws Exception {
|
String[] columns, String[] defaultValues, String dumpPrefix) throws Exception {
|
||||||
|
return dumpCfIfExists(ks, session, cfName, columns, defaultValues, dumpPrefix, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Path dumpCfIfExists(KeyspaceMetadata ks, Session session, String cfName,
|
||||||
|
String[] columns, String[] defaultValues, String dumpPrefix, boolean printHeader) throws Exception {
|
||||||
if (ks.getTable(cfName) != null) {
|
if (ks.getTable(cfName) != null) {
|
||||||
Path dumpFile = Files.createTempFile(dumpPrefix, null);
|
Path dumpFile = Files.createTempFile(dumpPrefix, null);
|
||||||
Files.deleteIfExists(dumpFile);
|
Files.deleteIfExists(dumpFile);
|
||||||
try (CSVPrinter csvPrinter = new CSVPrinter(Files.newBufferedWriter(dumpFile), CSV_DUMP_FORMAT)) {
|
CSVFormat csvFormat = CSV_DUMP_FORMAT;
|
||||||
|
if (printHeader) {
|
||||||
|
csvFormat = csvFormat.withHeader(columns);
|
||||||
|
}
|
||||||
|
try (CSVPrinter csvPrinter = new CSVPrinter(Files.newBufferedWriter(dumpFile), csvFormat)) {
|
||||||
Statement stmt = new SimpleStatement("SELECT * FROM " + cfName);
|
Statement stmt = new SimpleStatement("SELECT * FROM " + cfName);
|
||||||
stmt.setFetchSize(1000);
|
stmt.setFetchSize(1000);
|
||||||
ResultSet rs = session.execute(stmt);
|
ResultSet rs = session.execute(stmt);
|
||||||
@ -74,9 +84,19 @@ public class CassandraDbHelper {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static void loadCf(KeyspaceMetadata ks, Session session, String cfName, String[] columns, Path sourceFile) throws Exception {
|
public static void loadCf(KeyspaceMetadata ks, Session session, String cfName, String[] columns, Path sourceFile) throws Exception {
|
||||||
|
loadCf(ks, session, cfName, columns, sourceFile, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void loadCf(KeyspaceMetadata ks, Session session, String cfName, String[] columns, Path sourceFile, boolean parseHeader) throws Exception {
|
||||||
TableMetadata tableMetadata = ks.getTable(cfName);
|
TableMetadata tableMetadata = ks.getTable(cfName);
|
||||||
PreparedStatement prepared = session.prepare(createInsertStatement(cfName, columns));
|
PreparedStatement prepared = session.prepare(createInsertStatement(cfName, columns));
|
||||||
try (CSVParser csvParser = new CSVParser(Files.newBufferedReader(sourceFile), CSV_DUMP_FORMAT.withHeader(columns))) {
|
CSVFormat csvFormat = CSV_DUMP_FORMAT;
|
||||||
|
if (parseHeader) {
|
||||||
|
csvFormat = csvFormat.withFirstRecordAsHeader();
|
||||||
|
} else {
|
||||||
|
csvFormat = CSV_DUMP_FORMAT.withHeader(columns);
|
||||||
|
}
|
||||||
|
try (CSVParser csvParser = new CSVParser(Files.newBufferedReader(sourceFile), csvFormat)) {
|
||||||
csvParser.forEach(record -> {
|
csvParser.forEach(record -> {
|
||||||
BoundStatement boundStatement = prepared.bind();
|
BoundStatement boundStatement = prepared.bind();
|
||||||
for (String column : columns) {
|
for (String column : columns) {
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
package org.thingsboard.server.service.install.sql;
|
package org.thingsboard.server.service.install.sql;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.csv.CSVFormat;
|
||||||
import org.apache.commons.csv.CSVParser;
|
import org.apache.commons.csv.CSVParser;
|
||||||
import org.apache.commons.csv.CSVPrinter;
|
import org.apache.commons.csv.CSVPrinter;
|
||||||
import org.apache.commons.csv.CSVRecord;
|
import org.apache.commons.csv.CSVRecord;
|
||||||
@ -38,6 +39,11 @@ public class SqlDbHelper {
|
|||||||
|
|
||||||
public static Path dumpTableIfExists(Connection conn, String tableName,
|
public static Path dumpTableIfExists(Connection conn, String tableName,
|
||||||
String[] columns, String[] defaultValues, String dumpPrefix) throws Exception {
|
String[] columns, String[] defaultValues, String dumpPrefix) throws Exception {
|
||||||
|
return dumpTableIfExists(conn, tableName, columns, defaultValues, dumpPrefix, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Path dumpTableIfExists(Connection conn, String tableName,
|
||||||
|
String[] columns, String[] defaultValues, String dumpPrefix, boolean printHeader) throws Exception {
|
||||||
|
|
||||||
DatabaseMetaData metaData = conn.getMetaData();
|
DatabaseMetaData metaData = conn.getMetaData();
|
||||||
ResultSet res = metaData.getTables(null, null, tableName,
|
ResultSet res = metaData.getTables(null, null, tableName,
|
||||||
@ -46,7 +52,11 @@ public class SqlDbHelper {
|
|||||||
res.close();
|
res.close();
|
||||||
Path dumpFile = Files.createTempFile(dumpPrefix, null);
|
Path dumpFile = Files.createTempFile(dumpPrefix, null);
|
||||||
Files.deleteIfExists(dumpFile);
|
Files.deleteIfExists(dumpFile);
|
||||||
try (CSVPrinter csvPrinter = new CSVPrinter(Files.newBufferedWriter(dumpFile), CSV_DUMP_FORMAT)) {
|
CSVFormat csvFormat = CSV_DUMP_FORMAT;
|
||||||
|
if (printHeader) {
|
||||||
|
csvFormat = csvFormat.withHeader(columns);
|
||||||
|
}
|
||||||
|
try (CSVPrinter csvPrinter = new CSVPrinter(Files.newBufferedWriter(dumpFile), csvFormat)) {
|
||||||
try (PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + tableName)) {
|
try (PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + tableName)) {
|
||||||
try (ResultSet tableRes = stmt.executeQuery()) {
|
try (ResultSet tableRes = stmt.executeQuery()) {
|
||||||
ResultSetMetaData resMetaData = tableRes.getMetaData();
|
ResultSetMetaData resMetaData = tableRes.getMetaData();
|
||||||
@ -68,19 +78,30 @@ public class SqlDbHelper {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static void loadTable(Connection conn, String tableName, String[] columns, Path sourceFile) throws Exception {
|
public static void loadTable(Connection conn, String tableName, String[] columns, Path sourceFile) throws Exception {
|
||||||
PreparedStatement prepared = conn.prepareStatement(createInsertStatement(tableName, columns));
|
loadTable(conn, tableName, columns, sourceFile, false);
|
||||||
prepared.getParameterMetaData();
|
}
|
||||||
try (CSVParser csvParser = new CSVParser(Files.newBufferedReader(sourceFile), CSV_DUMP_FORMAT.withHeader(columns))) {
|
|
||||||
csvParser.forEach(record -> {
|
public static void loadTable(Connection conn, String tableName, String[] columns, Path sourceFile, boolean parseHeader) throws Exception {
|
||||||
try {
|
CSVFormat csvFormat = CSV_DUMP_FORMAT;
|
||||||
for (int i=0;i<columns.length;i++) {
|
if (parseHeader) {
|
||||||
setColumnValue(i, columns[i], record, prepared);
|
csvFormat = csvFormat.withFirstRecordAsHeader();
|
||||||
|
} else {
|
||||||
|
csvFormat = CSV_DUMP_FORMAT.withHeader(columns);
|
||||||
|
}
|
||||||
|
try (PreparedStatement prepared = conn.prepareStatement(createInsertStatement(tableName, columns))) {
|
||||||
|
prepared.getParameterMetaData();
|
||||||
|
try (CSVParser csvParser = new CSVParser(Files.newBufferedReader(sourceFile), csvFormat)) {
|
||||||
|
csvParser.forEach(record -> {
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < columns.length; i++) {
|
||||||
|
setColumnValue(i, columns[i], record, prepared);
|
||||||
|
}
|
||||||
|
prepared.execute();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.error("Unable to load table record!", e);
|
||||||
}
|
}
|
||||||
prepared.execute();
|
});
|
||||||
} catch (SQLException e) {
|
}
|
||||||
log.error("Unable to load table record!", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user