diff --git a/tools/pom.xml b/tools/pom.xml index bb0435d434..3b163b44b3 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -56,8 +56,9 @@ cassandra-all - com.datastax.oss - java-driver-core + com.datastax.cassandra + cassandra-driver-core + 3.10.1 commons-io diff --git a/tools/src/main/java/org/thingsboard/client/tools/migrator/DictionaryParser.java b/tools/src/main/java/org/thingsboard/client/tools/migrator/DictionaryParser.java new file mode 100644 index 0000000000..d88b4361da --- /dev/null +++ b/tools/src/main/java/org/thingsboard/client/tools/migrator/DictionaryParser.java @@ -0,0 +1,55 @@ +package org.thingsboard.client.tools.migrator; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.LineIterator; +import org.apache.commons.lang3.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class DictionaryParser { + private Map dictionaryParsed = new HashMap<>(); + + public DictionaryParser(File sourceFile) throws IOException { + parseDictionaryDump(FileUtils.lineIterator(sourceFile)); + } + + public String getKeyByKeyId(String keyId) { + return dictionaryParsed.get(keyId); + } + + private boolean isBlockFinished(String line) { + return StringUtils.isBlank(line) || line.equals("\\."); + } + + private boolean isBlockStarted(String line) { + return line.startsWith("COPY public.ts_kv_dictionary ("); + } + + private void parseDictionaryDump(LineIterator iterator) { + String tempLine; + while(iterator.hasNext()) { + tempLine = iterator.nextLine(); + + if(isBlockStarted(tempLine)) { + processBlock(iterator); + } + } + } + + private void processBlock(LineIterator lineIterator) { + String tempLine; + String[] lineSplited; + while(lineIterator.hasNext()) { + tempLine = lineIterator.nextLine(); + if(isBlockFinished(tempLine)) { + return; + } + + lineSplited = tempLine.split("\t"); + dictionaryParsed.put(lineSplited[1], lineSplited[0]); + } + } +} diff --git a/tools/src/main/java/org/thingsboard/client/tools/migrator/MigratorTool.java b/tools/src/main/java/org/thingsboard/client/tools/migrator/MigratorTool.java index e3a98e974f..83ca54624a 100644 --- a/tools/src/main/java/org/thingsboard/client/tools/migrator/MigratorTool.java +++ b/tools/src/main/java/org/thingsboard/client/tools/migrator/MigratorTool.java @@ -30,17 +30,25 @@ public class MigratorTool { public static void main(String[] args) { CommandLine cmd = parseArgs(args); - try { - File latestSource = new File(cmd.getOptionValue("latestTelemetryFrom")); - File latestSaveDir = new File(cmd.getOptionValue("latestTelemetryOut")); - File tsSource = new File(cmd.getOptionValue("telemetryFrom")); - File tsSaveDir = new File(cmd.getOptionValue("telemetryOut")); - File partitionsSaveDir = new File(cmd.getOptionValue("partitionsOut")); boolean castEnable = Boolean.parseBoolean(cmd.getOptionValue("castEnable")); + File allTelemetrySource = new File(cmd.getOptionValue("telemetryFrom")); - PgCaLatestMigrator.migrateLatest(latestSource, latestSaveDir, castEnable); - PostgresToCassandraTelemetryMigrator.migrateTs(tsSource, tsSaveDir, partitionsSaveDir, castEnable); + RelatedEntitiesParser allEntityIdsAndTypes = + new RelatedEntitiesParser(new File(cmd.getOptionValue("relatedEntities"))); + DictionaryParser dictionaryParser = new DictionaryParser(allTelemetrySource); + + if(cmd.getOptionValue("latestTelemetryOut") != null) { + File latestSaveDir = new File(cmd.getOptionValue("latestTelemetryOut")); + PgCaLatestMigrator.migrateLatest(allTelemetrySource, latestSaveDir, allEntityIdsAndTypes, dictionaryParser, castEnable); + } + if(cmd.getOptionValue("telemetryOut") != null) { + File tsSaveDir = new File(cmd.getOptionValue("telemetryOut")); + File partitionsSaveDir = new File(cmd.getOptionValue("partitionsOut")); + PostgresToCassandraTelemetryMigrator.migrateTs( + allTelemetrySource, tsSaveDir, partitionsSaveDir, allEntityIdsAndTypes, dictionaryParser, castEnable + ); + } } catch (Throwable th) { th.printStackTrace(); @@ -52,30 +60,30 @@ public class MigratorTool { private static CommandLine parseArgs(String[] args) { Options options = new Options(); - Option latestTsOpt = new Option("latestFrom", "latestTelemetryFrom", true, "latest telemetry source file path"); - latestTsOpt.setRequired(true); - options.addOption(latestTsOpt); + Option telemetryAllFrom = new Option("telemetryFrom", "telemetryFrom", true, "telemetry source file"); + telemetryAllFrom.setRequired(true); + options.addOption(telemetryAllFrom); Option latestTsOutOpt = new Option("latestOut", "latestTelemetryOut", true, "latest telemetry save dir"); - latestTsOutOpt.setRequired(true); + latestTsOutOpt.setRequired(false); options.addOption(latestTsOutOpt); - Option tsOpt = new Option("tsFrom", "telemetryFrom", true, "telemetry source file path"); - tsOpt.setRequired(true); - options.addOption(tsOpt); - Option tsOutOpt = new Option("tsOut", "telemetryOut", true, "sstable save dir"); - tsOutOpt.setRequired(true); + tsOutOpt.setRequired(false); options.addOption(tsOutOpt); Option partitionOutOpt = new Option("partitionsOut", "partitionsOut", true, "partitions save dir"); - partitionOutOpt.setRequired(true); + partitionOutOpt.setRequired(false); options.addOption(partitionOutOpt); Option castOpt = new Option("castEnable", "castEnable", true, "cast String to Double if possible"); castOpt.setRequired(true); options.addOption(castOpt); + Option relatedOpt = new Option("relatedEntities", "relatedEntities", true, "related entities source file path"); + relatedOpt.setRequired(true); + options.addOption(relatedOpt); + HelpFormatter formatter = new HelpFormatter(); CommandLineParser parser = new BasicParser(); diff --git a/tools/src/main/java/org/thingsboard/client/tools/migrator/PgCaLatestMigrator.java b/tools/src/main/java/org/thingsboard/client/tools/migrator/PgCaLatestMigrator.java index 5f0a84c7f4..5d4a2a6e81 100644 --- a/tools/src/main/java/org/thingsboard/client/tools/migrator/PgCaLatestMigrator.java +++ b/tools/src/main/java/org/thingsboard/client/tools/migrator/PgCaLatestMigrator.java @@ -43,14 +43,21 @@ public class PgCaLatestMigrator { private static long castedOk = 0; private static long currentWriterCount = 1; - private static CQLSSTableWriter currentTsWriter = null; + private static RelatedEntitiesParser allIdsAndTypes; + private static DictionaryParser keyPairs; - public static void migrateLatest(File sourceFile, File outDir, boolean castStringsIfPossible) throws IOException { + public static void migrateLatest(File sourceFile, + File outDir, + RelatedEntitiesParser allEntityIdsAndTypes, + DictionaryParser dictionaryParser, + boolean castStringsIfPossible) throws IOException { long startTs = System.currentTimeMillis(); long stepLineTs = System.currentTimeMillis(); long stepOkLineTs = System.currentTimeMillis(); LineIterator iterator = FileUtils.lineIterator(sourceFile); - currentTsWriter = WriterBuilder.getTsWriter(outDir); + CQLSSTableWriter currentTsWriter = WriterBuilder.getLatestWriter(outDir); + allIdsAndTypes = allEntityIdsAndTypes; + keyPairs = dictionaryParser; boolean isBlockStarted = false; boolean isBlockFinished = false; @@ -107,7 +114,7 @@ public class PgCaLatestMigrator { } if (linesMigrated++ % LOG_BATCH == 0) { - System.out.println(new Date() + " migrated = " + linesMigrated + " in " + (System.currentTimeMillis() - stepOkLineTs)); + System.out.println(new Date() + " migrated = " + linesMigrated + " in " + (System.currentTimeMillis() - stepOkLineTs) + " ms."); stepOkLineTs = System.currentTimeMillis(); } } catch (Exception ex) { @@ -119,7 +126,7 @@ public class PgCaLatestMigrator { long endTs = System.currentTimeMillis(); System.out.println(); - System.out.println(new Date() + " Migrated rows " + linesMigrated + " in " + (endTs - startTs)); + System.out.println(new Date() + " Migrated rows " + linesMigrated + " in " + (endTs - startTs) + " ts"); currentTsWriter.close(); System.out.println(); @@ -146,34 +153,31 @@ public class PgCaLatestMigrator { private static List toValues(List raw) { //expected Table structure: -// COPY public.ts_kv_latest (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) FROM stdin; - + //COPY public.ts_kv_latest (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) FROM stdin; List result = new ArrayList<>(); - result.add(raw.get(0)); - result.add(fromString(raw.get(1))); - result.add(raw.get(2)); + result.add(allIdsAndTypes.getEntityType(raw.get(0))); + result.add(UUID.fromString(raw.get(0))); + result.add(keyPairs.getKeyByKeyId(raw.get(1))); - long ts = Long.parseLong(raw.get(3)); - result.add(ts); + long ts = Long.parseLong(raw.get(2)); + result.add(3, ts); + + result.add(raw.get(3).equals("\\N") ? null : raw.get(3).equals("t") ? Boolean.TRUE : Boolean.FALSE); + result.add(raw.get(4).equals("\\N") ? null : raw.get(4)); + result.add(raw.get(5).equals("\\N") ? null : Long.parseLong(raw.get(5))); + result.add(raw.get(6).equals("\\N") ? null : Double.parseDouble(raw.get(6))); + result.add(raw.get(7).equals("\\N") ? null : raw.get(7)); - result.add(raw.get(4).equals("\\N") ? null : raw.get(4).equals("t") ? Boolean.TRUE : Boolean.FALSE); - result.add(raw.get(5).equals("\\N") ? null : raw.get(5)); - result.add(raw.get(6).equals("\\N") ? null : Long.parseLong(raw.get(6))); - result.add(raw.get(7).equals("\\N") ? null : Double.parseDouble(raw.get(7))); return result; } - public static UUID fromString(String src) { - return UUID.fromString(src.substring(7, 15) + "-" + src.substring(3, 7) + "-1" - + src.substring(0, 3) + "-" + src.substring(15, 19) + "-" + src.substring(19)); - } - private static boolean isBlockStarted(String line) { - return line.startsWith("COPY public.ts_kv_latest"); + return line.startsWith("COPY public.ts_kv_latest ("); } private static boolean isBlockFinished(String line) { return StringUtils.isBlank(line) || line.equals("\\."); } + } diff --git a/tools/src/main/java/org/thingsboard/client/tools/migrator/PostgresToCassandraTelemetryMigrator.java b/tools/src/main/java/org/thingsboard/client/tools/migrator/PostgresToCassandraTelemetryMigrator.java index 3ceab85ce3..80e617f505 100644 --- a/tools/src/main/java/org/thingsboard/client/tools/migrator/PostgresToCassandraTelemetryMigrator.java +++ b/tools/src/main/java/org/thingsboard/client/tools/migrator/PostgresToCassandraTelemetryMigrator.java @@ -42,7 +42,6 @@ public class PostgresToCassandraTelemetryMigrator { private static final long LOG_BATCH = 1000000; private static final long rowPerFile = 1000000; - private static long linesProcessed = 0; private static long linesMigrated = 0; private static long castErrors = 0; @@ -53,15 +52,23 @@ public class PostgresToCassandraTelemetryMigrator { private static CQLSSTableWriter currentPartitionWriter = null; private static Set partitions = new HashSet<>(); + private static RelatedEntitiesParser entityIdsAndTypes; + private static DictionaryParser keyParser; - - public static void migrateTs(File sourceFile, File outTsDir, File outPartitionDir, boolean castStringsIfPossible) throws IOException { + public static void migrateTs(File sourceFile, + File outTsDir, + File outPartitionDir, + RelatedEntitiesParser allEntityIdsAndTypes, + DictionaryParser dictionaryParser, + boolean castStringsIfPossible) throws IOException { long startTs = System.currentTimeMillis(); long stepLineTs = System.currentTimeMillis(); long stepOkLineTs = System.currentTimeMillis(); LineIterator iterator = FileUtils.lineIterator(sourceFile); currentTsWriter = WriterBuilder.getTsWriter(outTsDir); currentPartitionWriter = WriterBuilder.getPartitionWriter(outPartitionDir); + entityIdsAndTypes = allEntityIdsAndTypes; + keyParser = dictionaryParser; boolean isBlockStarted = false; boolean isBlockFinished = false; @@ -182,29 +189,24 @@ public class PostgresToCassandraTelemetryMigrator { //expected Table structure: // COPY public.ts_kv (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) FROM stdin; - List result = new ArrayList<>(); - result.add(raw.get(0)); - result.add(fromString(raw.get(1))); - result.add(raw.get(2)); + result.add(entityIdsAndTypes.getEntityType(raw.get(0))); + result.add(UUID.fromString(raw.get(0))); + result.add(keyParser.getKeyByKeyId(raw.get(1))); - long ts = Long.parseLong(raw.get(3)); + long ts = Long.parseLong(raw.get(2)); long partition = toPartitionTs(ts); result.add(partition); result.add(ts); - result.add(raw.get(4).equals("\\N") ? null : raw.get(4).equals("t") ? Boolean.TRUE : Boolean.FALSE); - result.add(raw.get(5).equals("\\N") ? null : raw.get(5)); - result.add(raw.get(6).equals("\\N") ? null : Long.parseLong(raw.get(6))); - result.add(raw.get(7).equals("\\N") ? null : Double.parseDouble(raw.get(7))); + result.add(raw.get(3).equals("\\N") ? null : raw.get(3).equals("t") ? Boolean.TRUE : Boolean.FALSE); + result.add(raw.get(4).equals("\\N") ? null : raw.get(4)); + result.add(raw.get(5).equals("\\N") ? null : Long.parseLong(raw.get(5))); + result.add(raw.get(6).equals("\\N") ? null : Double.parseDouble(raw.get(6))); + result.add(raw.get(7).equals("\\N") ? null : raw.get(7)); return result; } - public static UUID fromString(String src) { - return UUID.fromString(src.substring(7, 15) + "-" + src.substring(3, 7) + "-1" - + src.substring(0, 3) + "-" + src.substring(15, 19) + "-" + src.substring(19)); - } - private static long toPartitionTs(long ts) { LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); return time.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).toInstant(ZoneOffset.UTC).toEpochMilli(); @@ -212,7 +214,7 @@ public class PostgresToCassandraTelemetryMigrator { } private static boolean isBlockStarted(String line) { - return line.startsWith("COPY public.ts_kv"); + return line.startsWith("COPY public.ts_kv ("); } private static boolean isBlockFinished(String line) { diff --git a/tools/src/main/java/org/thingsboard/client/tools/migrator/README.md b/tools/src/main/java/org/thingsboard/client/tools/migrator/README.md index 70c5dafcaf..1c5a6d54b7 100644 --- a/tools/src/main/java/org/thingsboard/client/tools/migrator/README.md +++ b/tools/src/main/java/org/thingsboard/client/tools/migrator/README.md @@ -21,15 +21,17 @@ It will generate single jar file with all required dependencies inside `target d #### Dump data from the source Postgres Database *Do not use compression if possible because Tool can only work with uncompressed file -1. Dump table `ts_kv` table: +*If you want to migrate just `ts_kv` without `ts_kv_latest` just don't dump an unnecessary table and when starting the tool don't use arguments (paths) for input dump and output files* - `pg_dump -h localhost -U postgres -d thingsboard -t ts_kv > ts_kv.dmp` +1. Dump related tables that need to correct save telemetry + + `pg_dump -h localhost -U postgres -d thingsboard -t tenant -t customer -t user -t dashboard -t asset -t device -t alarm -t rule_chain -t rule_node -t entity_view -t widgets_bundle -t widget_type -t tenant_profile -t device_profile -t api_usage_state -t tb_user > related_entities.dmp` + +2. Dump `ts_kv` and child: + + `pg_dump -h localhost -U postgres -d thingsboard --load-via-partition-root --data-only -t ts_kv* > ts_kv_all.dmp` -2. Dump table `ts_kv_latest` table: - - `pg_dump -h localhost -U postgres -d thingsboard -t ts_kv_latest > ts_kv_latest.dmp` - -3. [Optional] move table dumps to the instance where cassandra will be hosted +3. [Optional] Move table dumps to the instance where cassandra will be hosted #### Prepare directory structure for SSTables Tool use 3 different directories for saving SSTables - `ts_kv_cf`, `ts_kv_latest_cf`, `ts_kv_partitions_cf` @@ -45,9 +47,8 @@ Create 3 empty directories. For example: ``` java -jar ./tools-2.4.1-SNAPSHOT-jar-with-dependencies.jar - -latestFrom ./source/ts_kv_latest.dmp + -telemetryFrom ./source/ts_kv_all.dmp -latestOut /home/ubunut/migration/ts_latest - -tsFrom ./source/ts_kv.dmp -tsOut /home/ubunut/migration/ts -partitionsOut /home/ubunut/migration/ts_partition -castEnable false diff --git a/tools/src/main/java/org/thingsboard/client/tools/migrator/RelatedEntitiesParser.java b/tools/src/main/java/org/thingsboard/client/tools/migrator/RelatedEntitiesParser.java new file mode 100644 index 0000000000..655e623490 --- /dev/null +++ b/tools/src/main/java/org/thingsboard/client/tools/migrator/RelatedEntitiesParser.java @@ -0,0 +1,76 @@ +package org.thingsboard.client.tools.migrator; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.LineIterator; +import org.apache.commons.lang3.StringUtils; +import org.thingsboard.server.common.data.EntityType; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class RelatedEntitiesParser { + private final Map allEntityIdsAndTypes = new HashMap<>(); + + public RelatedEntitiesParser(File source) throws IOException { + processAllTables(FileUtils.lineIterator(source)); + } + + public String getEntityType(String uuid) { + return this.allEntityIdsAndTypes.get(uuid); + } + + private boolean isBlockFinished(String line) { + return StringUtils.isBlank(line) || line.equals("\\."); + } + + private void processAllTables(LineIterator lineIterator) { + String currentLine; + while(lineIterator.hasNext()) { + currentLine = lineIterator.nextLine(); + if(currentLine.startsWith("COPY public.alarm")) { + processBlock(lineIterator, EntityType.ALARM); + } else if (currentLine.startsWith("COPY public.asset")) { + processBlock(lineIterator, EntityType.ASSET); + } else if (currentLine.startsWith("COPY public.customer")) { + processBlock(lineIterator, EntityType.CUSTOMER); + } else if (currentLine.startsWith("COPY public.dashboard")) { + processBlock(lineIterator, EntityType.DASHBOARD); + } else if (currentLine.startsWith("COPY public.device")) { + processBlock(lineIterator, EntityType.DEVICE); + } else if (currentLine.startsWith("COPY public.rule_chain")) { + processBlock(lineIterator, EntityType.RULE_CHAIN); + } else if (currentLine.startsWith("COPY public.rule_node")) { + processBlock(lineIterator, EntityType.RULE_NODE); + } else if (currentLine.startsWith("COPY public.tenant")) { + processBlock(lineIterator, EntityType.TENANT); + } else if (currentLine.startsWith("COPY public.tb_user")) { + processBlock(lineIterator, EntityType.USER); + } else if (currentLine.startsWith("COPY public.entity_view")) { + processBlock(lineIterator, EntityType.ENTITY_VIEW); + } else if (currentLine.startsWith("COPY public.widgets_bundle")) { + processBlock(lineIterator, EntityType.WIDGETS_BUNDLE); + } else if (currentLine.startsWith("COPY public.widget_type")) { + processBlock(lineIterator, EntityType.WIDGET_TYPE); + } else if (currentLine.startsWith("COPY public.tenant_profile")) { + processBlock(lineIterator, EntityType.TENANT_PROFILE); + } else if (currentLine.startsWith("COPY public.device_profile")) { + processBlock(lineIterator, EntityType.DEVICE_PROFILE); + } else if (currentLine.startsWith("COPY public.api_usage_state")) { + processBlock(lineIterator, EntityType.API_USAGE_STATE); + } + } + } + + private void processBlock(LineIterator lineIterator, EntityType entityType) { + String currentLine; + while(lineIterator.hasNext()) { + currentLine = lineIterator.nextLine(); + if(isBlockFinished(currentLine)) { + return; + } + allEntityIdsAndTypes.put(currentLine.split("\t")[0], entityType.name()); + } + } +} diff --git a/tools/src/main/java/org/thingsboard/client/tools/migrator/WriterBuilder.java b/tools/src/main/java/org/thingsboard/client/tools/migrator/WriterBuilder.java index cfe36aaa14..5228da98a3 100644 --- a/tools/src/main/java/org/thingsboard/client/tools/migrator/WriterBuilder.java +++ b/tools/src/main/java/org/thingsboard/client/tools/migrator/WriterBuilder.java @@ -31,6 +31,7 @@ public class WriterBuilder { " str_v text,\n" + " long_v bigint,\n" + " dbl_v double,\n" + + " json_v text,\n" + " PRIMARY KEY (( entity_type, entity_id, key, partition ), ts)\n" + ");"; @@ -43,6 +44,7 @@ public class WriterBuilder { " str_v text,\n" + " long_v bigint,\n" + " dbl_v double,\n" + + " json_v text,\n" + " PRIMARY KEY (( entity_type, entity_id ), key)\n" + ") WITH compaction = { 'class' : 'LeveledCompactionStrategy' };"; @@ -59,8 +61,8 @@ public class WriterBuilder { return CQLSSTableWriter.builder() .inDirectory(dir) .forTable(tsSchema) - .using("INSERT INTO thingsboard.ts_kv_cf (entity_type, entity_id, key, partition, ts, bool_v, str_v, long_v, dbl_v) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)") + .using("INSERT INTO thingsboard.ts_kv_cf (entity_type, entity_id, key, partition, ts, bool_v, str_v, long_v, dbl_v, json_v) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") .build(); } @@ -68,8 +70,8 @@ public class WriterBuilder { return CQLSSTableWriter.builder() .inDirectory(dir) .forTable(latestSchema) - .using("INSERT INTO thingsboard.ts_kv_latest_cf (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?)") + .using("INSERT INTO thingsboard.ts_kv_latest_cf (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)") .build(); }