tool for migrating from Postgres to hybrid mode

This commit is contained in:
vparomskiy 2019-10-14 18:09:18 +03:00
parent 4d215f3191
commit b95dc476cb
6 changed files with 672 additions and 0 deletions

View File

@ -51,6 +51,38 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>3.11.4</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.thingsboard.client.tools.migrator.MigratorTool</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

View File

@ -0,0 +1,93 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.client.tools.migrator;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import java.io.File;
public class MigratorTool {
public static void main(String[] args) {
CommandLine cmd = parseArgs(args);
try {
File latestSource = new File(cmd.getOptionValue("latestTelemetryFrom"));
File latestSaveDir = new File(cmd.getOptionValue("latestTelemetryOut"));
File tsSource = new File(cmd.getOptionValue("telemetryFrom"));
File tsSaveDir = new File(cmd.getOptionValue("telemetryOut"));
File partitionsSaveDir = new File(cmd.getOptionValue("partitionsOut"));
boolean castEnable = Boolean.parseBoolean(cmd.getOptionValue("castEnable"));
PgCaLatestMigrator.migrateLatest(latestSource, latestSaveDir, castEnable);
PostgresToCassandraTelemetryMigrator.migrateTs(tsSource, tsSaveDir, partitionsSaveDir, castEnable);
} catch (Throwable th) {
th.printStackTrace();
throw new IllegalStateException("failed", th);
}
}
private static CommandLine parseArgs(String[] args) {
Options options = new Options();
Option latestTsOpt = new Option("latestFrom", "latestTelemetryFrom", true, "latest telemetry source file path");
latestTsOpt.setRequired(true);
options.addOption(latestTsOpt);
Option latestTsOutOpt = new Option("latestOut", "latestTelemetryOut", true, "latest telemetry save dir");
latestTsOutOpt.setRequired(true);
options.addOption(latestTsOutOpt);
Option tsOpt = new Option("tsFrom", "telemetryFrom", true, "telemetry source file path");
tsOpt.setRequired(true);
options.addOption(tsOpt);
Option tsOutOpt = new Option("tsOut", "telemetryOut", true, "sstable save dir");
tsOutOpt.setRequired(true);
options.addOption(tsOutOpt);
Option partitionOutOpt = new Option("partitionsOut", "partitionsOut", true, "partitions save dir");
partitionOutOpt.setRequired(true);
options.addOption(partitionOutOpt);
Option castOpt = new Option("castEnable", "castEnable", true, "cast String to Double if possible");
castOpt.setRequired(true);
options.addOption(castOpt);
HelpFormatter formatter = new HelpFormatter();
CommandLineParser parser = new BasicParser();
try {
return parser.parse(options, args);
} catch (ParseException e) {
System.out.println(e.getMessage());
formatter.printHelp("utility-name", options);
System.exit(1);
}
return null;
}
}

View File

@ -0,0 +1,179 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.client.tools.migrator;
import com.google.common.collect.Lists;
import org.apache.cassandra.io.sstable.CQLSSTableWriter;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
public class PgCaLatestMigrator {
private static final long LOG_BATCH = 1000000;
private static final long rowPerFile = 1000000;
private static long linesProcessed = 0;
private static long linesMigrated = 0;
private static long castErrors = 0;
private static long castedOk = 0;
private static long currentWriterCount = 1;
private static CQLSSTableWriter currentTsWriter = null;
public static void migrateLatest(File sourceFile, File outDir, boolean castStringsIfPossible) throws IOException {
long startTs = System.currentTimeMillis();
long stepLineTs = System.currentTimeMillis();
long stepOkLineTs = System.currentTimeMillis();
LineIterator iterator = FileUtils.lineIterator(sourceFile);
currentTsWriter = WriterBuilder.getTsWriter(outDir);
boolean isBlockStarted = false;
boolean isBlockFinished = false;
String line;
while (iterator.hasNext()) {
if (linesProcessed++ % LOG_BATCH == 0) {
System.out.println(new Date() + " linesProcessed = " + linesProcessed + " in " + (System.currentTimeMillis() - stepLineTs) + " castOk " + castedOk + " castErr " + castErrors);
stepLineTs = System.currentTimeMillis();
}
line = iterator.nextLine();
if (isBlockFinished) {
break;
}
if (!isBlockStarted) {
if (isBlockStarted(line)) {
System.out.println();
System.out.println();
System.out.println(line);
System.out.println();
System.out.println();
isBlockStarted = true;
}
continue;
}
if (isBlockFinished(line)) {
isBlockFinished = true;
} else {
try {
List<String> raw = Arrays.stream(line.trim().split("\t"))
.map(String::trim)
.filter(StringUtils::isNotEmpty)
.collect(Collectors.toList());
List<Object> values = toValues(raw);
if (currentWriterCount == 0) {
System.out.println(new Date() + " close writer " + new Date());
currentTsWriter.close();
currentTsWriter = WriterBuilder.getLatestWriter(outDir);
}
if (castStringsIfPossible) {
currentTsWriter.addRow(castToNumericIfPossible(values));
} else {
currentTsWriter.addRow(values);
}
currentWriterCount++;
if (currentWriterCount >= rowPerFile) {
currentWriterCount = 0;
}
if (linesMigrated++ % LOG_BATCH == 0) {
System.out.println(new Date() + " migrated = " + linesMigrated + " in " + (System.currentTimeMillis() - stepOkLineTs));
stepOkLineTs = System.currentTimeMillis();
}
} catch (Exception ex) {
System.out.println(ex.getMessage() + " -> " + line);
}
}
}
long endTs = System.currentTimeMillis();
System.out.println();
System.out.println(new Date() + " Migrated rows " + linesMigrated + " in " + (endTs - startTs));
currentTsWriter.close();
System.out.println();
System.out.println("Finished migrate Latest Telemetry");
}
private static List<Object> castToNumericIfPossible(List<Object> values) {
try {
if (values.get(6) != null && NumberUtils.isNumber(values.get(6).toString())) {
Double casted = NumberUtils.createDouble(values.get(6).toString());
List<Object> numeric = Lists.newArrayList();
numeric.addAll(values);
numeric.set(6, null);
numeric.set(8, casted);
castedOk++;
return numeric;
}
} catch (Throwable th) {
castErrors++;
}
return values;
}
private static List<Object> toValues(List<String> raw) {
//expected Table structure:
// COPY public.ts_kv_latest (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) FROM stdin;
List<Object> result = new ArrayList<>();
result.add(raw.get(0));
result.add(fromString(raw.get(1)));
result.add(raw.get(2));
long ts = Long.parseLong(raw.get(3));
result.add(ts);
result.add(raw.get(4).equals("\\N") ? null : raw.get(4).equals("t") ? Boolean.TRUE : Boolean.FALSE);
result.add(raw.get(5).equals("\\N") ? null : raw.get(5));
result.add(raw.get(6).equals("\\N") ? null : Long.parseLong(raw.get(6)));
result.add(raw.get(7).equals("\\N") ? null : Double.parseDouble(raw.get(7)));
return result;
}
public static UUID fromString(String src) {
return UUID.fromString(src.substring(7, 15) + "-" + src.substring(3, 7) + "-1"
+ src.substring(0, 3) + "-" + src.substring(15, 19) + "-" + src.substring(19));
}
private static boolean isBlockStarted(String line) {
return line.startsWith("COPY public.ts_kv_latest");
}
private static boolean isBlockFinished(String line) {
return StringUtils.isBlank(line) || line.equals("\\.");
}
}

View File

@ -0,0 +1,222 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.client.tools.migrator;
import com.google.common.collect.Lists;
import org.apache.cassandra.io.sstable.CQLSSTableWriter;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
public class PostgresToCassandraTelemetryMigrator {
private static final long LOG_BATCH = 1000000;
private static final long rowPerFile = 1000000;
private static long linesProcessed = 0;
private static long linesMigrated = 0;
private static long castErrors = 0;
private static long castedOk = 0;
private static long currentWriterCount = 1;
private static CQLSSTableWriter currentTsWriter = null;
private static CQLSSTableWriter currentPartitionWriter = null;
private static Set<String> partitions = new HashSet<>();
public static void migrateTs(File sourceFile, File outTsDir, File outPartitionDir, boolean castStringsIfPossible) throws IOException {
long startTs = System.currentTimeMillis();
long stepLineTs = System.currentTimeMillis();
long stepOkLineTs = System.currentTimeMillis();
LineIterator iterator = FileUtils.lineIterator(sourceFile);
currentTsWriter = WriterBuilder.getTsWriter(outTsDir);
currentPartitionWriter = WriterBuilder.getPartitionWriter(outPartitionDir);
boolean isBlockStarted = false;
boolean isBlockFinished = false;
String line;
while (iterator.hasNext()) {
if (linesProcessed++ % LOG_BATCH == 0) {
System.out.println(new Date() + " linesProcessed = " + linesProcessed + " in " + (System.currentTimeMillis() - stepLineTs) + " castOk " + castedOk + " castErr " + castErrors);
stepLineTs = System.currentTimeMillis();
}
line = iterator.nextLine();
if (isBlockFinished) {
break;
}
if (!isBlockStarted) {
if (isBlockStarted(line)) {
System.out.println();
System.out.println();
System.out.println(line);
System.out.println();
System.out.println();
isBlockStarted = true;
}
continue;
}
if (isBlockFinished(line)) {
isBlockFinished = true;
} else {
try {
List<String> raw = Arrays.stream(line.trim().split("\t"))
.map(String::trim)
.filter(StringUtils::isNotEmpty)
.collect(Collectors.toList());
List<Object> values = toValues(raw);
if (currentWriterCount == 0) {
System.out.println(new Date() + " close writer " + new Date());
currentTsWriter.close();
currentTsWriter = WriterBuilder.getTsWriter(outTsDir);
}
if (castStringsIfPossible) {
currentTsWriter.addRow(castToNumericIfPossible(values));
} else {
currentTsWriter.addRow(values);
}
processPartitions(values);
currentWriterCount++;
if (currentWriterCount >= rowPerFile) {
currentWriterCount = 0;
}
if (linesMigrated++ % LOG_BATCH == 0) {
System.out.println(new Date() + " migrated = " + linesMigrated + " in " + (System.currentTimeMillis() - stepOkLineTs) + " partitions = " + partitions.size());
stepOkLineTs = System.currentTimeMillis();
}
} catch (Exception ex) {
System.out.println(ex.getMessage() + " -> " + line);
}
}
}
long endTs = System.currentTimeMillis();
System.out.println();
System.out.println(new Date() + " Migrated rows " + linesMigrated + " in " + (endTs - startTs));
System.out.println("Partitions collected " + partitions.size());
startTs = System.currentTimeMillis();
for (String partition : partitions) {
String[] split = partition.split("\\|");
List<Object> values = Lists.newArrayList();
values.add(split[0]);
values.add(UUID.fromString(split[1]));
values.add(split[2]);
values.add(Long.parseLong(split[3]));
currentPartitionWriter.addRow(values);
}
currentPartitionWriter.close();
endTs = System.currentTimeMillis();
System.out.println();
System.out.println();
System.out.println(new Date() + " Migrated partitions " + partitions.size() + " in " + (endTs - startTs));
currentTsWriter.close();
System.out.println();
System.out.println("Finished migrate Telemetry");
}
private static List<Object> castToNumericIfPossible(List<Object> values) {
try {
if (values.get(6) != null && NumberUtils.isNumber(values.get(6).toString())) {
Double casted = NumberUtils.createDouble(values.get(6).toString());
List<Object> numeric = Lists.newArrayList();
numeric.addAll(values);
numeric.set(6, null);
numeric.set(8, casted);
castedOk++;
return numeric;
}
} catch (Throwable th) {
castErrors++;
}
return values;
}
private static void processPartitions(List<Object> values) {
String key = values.get(0) + "|" + values.get(1) + "|" + values.get(2) + "|" + values.get(3);
partitions.add(key);
}
private static List<Object> toValues(List<String> raw) {
//expected Table structure:
// COPY public.ts_kv (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) FROM stdin;
List<Object> result = new ArrayList<>();
result.add(raw.get(0));
result.add(fromString(raw.get(1)));
result.add(raw.get(2));
long ts = Long.parseLong(raw.get(3));
long partition = toPartitionTs(ts);
result.add(partition);
result.add(ts);
result.add(raw.get(4).equals("\\N") ? null : raw.get(4).equals("t") ? Boolean.TRUE : Boolean.FALSE);
result.add(raw.get(5).equals("\\N") ? null : raw.get(5));
result.add(raw.get(6).equals("\\N") ? null : Long.parseLong(raw.get(6)));
result.add(raw.get(7).equals("\\N") ? null : Double.parseDouble(raw.get(7)));
return result;
}
public static UUID fromString(String src) {
return UUID.fromString(src.substring(7, 15) + "-" + src.substring(3, 7) + "-1"
+ src.substring(0, 3) + "-" + src.substring(15, 19) + "-" + src.substring(19));
}
private static long toPartitionTs(long ts) {
LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
return time.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).toInstant(ZoneOffset.UTC).toEpochMilli();
// return TsPartitionDate.MONTHS.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();
}
private static boolean isBlockStarted(String line) {
return line.startsWith("COPY public.ts_kv");
}
private static boolean isBlockFinished(String line) {
return StringUtils.isBlank(line) || line.equals("\\.");
}
}

View File

@ -0,0 +1,62 @@
# Description:
Tool used for migrating ThingsBoard into hybrid mode from Postgres.
Performance of this tool depends on disk type and instance type (mostly on CPU resources).
But in general here are few benchmarks:
1. Creating Dump of the postgres ts_kv table -> 100GB = 90 minutes
2. If postgres table has size 100GB then dump file will be about 30GB
3. Generation SSTables from dump -> 100GB = 3 hours
4. 100GB Dump file will be converted into SSTable with size about 18GB
# How to build Tool:
Switch to `tools` project in Command Line and execute
`mvn clean compile assembly:single`
It will generate single jar with dependencies.
# Instruction:
1. Dump Telemetry table from the source Postgres table. Do not use compression if possible because Tool can only work with uncompressed file
dump ts_kv table -> `pg_dump -h localhost -U postgres -d thingsboard -t ts_kv > ts_kv.dmp`
2. Dump Latest Telemetry table from the source Postgres table. Do not use compression if possible because Tool can only work with uncompressed file
dump ts_kv_latest -> `pg_dump -h localhost -U postgres -d thingsboard -t ts_kv_latest > ts_kv_latest.dmp`
3. [Optional] - move dumped files to the machine where cassandra will be hosted
4. Prepare directory structure:
Tool will use 3 different directories for saving SSTables - ts_kv_cf, ts_kv_latest_cf, ts_kv_partitions_cf
Create 3 empty directories. For example:
/home/ubunut/migration/ts
/home/ubunut/migration/ts_latest
/home/ubunut/migration/ts_partition
5. Run tool:
*Note: if you run this tool on remote instance - don't forget to execute this command in screen to avoid unexpected termination
java -jar ./tools-2.4.1-SNAPSHOT-jar-with-dependencies.jar
-latestFrom ./source/ts_kv_latest.dmp
-latestOut /home/ubunut/migration/ts_latest
-tsFrom ./source/ts_kv.dmp
-tsOut /home/ubunut/migration/ts
-partitionsOut /home/ubunut/migration/ts_partition
-castEnable false
## After tool finished
1. install Cassandra on the instance
2. Using `cqlsh` create `thingsboard` keyspace and requred tables from this file `schema-ts.cql`
3. Stop Cassandra
4. Copy generated SSTable files into cassandra data dir:
sudo find /home/ubunut/migration/ts -name '*.*' -exec mv {} /var/lib/cassandra/data/thingsboard/ts_kv_cf-0e9aaf00ee5511e9a5fa7d6f489ffd13/ \;
sudo find /home/ubunut/migration/ts_latest -name '*.*' -exec mv {} /var/lib/cassandra/data/thingsboard/ts_kv_latest_cf-161449d0ee5511e9a5fa7d6f489ffd13/ \;
sudo find /home/ubunut/migration/ts_partition -name '*.*' -exec mv {} /var/lib/cassandra/data/thingsboard/ts_kv_partitions_cf-12e8fa80ee5511e9a5fa7d6f489ffd13/ \;
5. Start Cassandra service and trigger compaction
trigger compactions: nodetool compact thingsboard
check compaction status: nodetool compactionstats
6. Switch Thignsboard to hybrid mode:
Modify Thingsboard properites file `thingsboard.yml`
- DATABASE_TS_TYPE = cassandra
- TS_KV_PARTITIONING = MONTHS
- [optional] - connection properties for cassandra
7. Start Thingsboard

View File

@ -0,0 +1,84 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.client.tools.migrator;
import org.apache.cassandra.io.sstable.CQLSSTableWriter;
import java.io.File;
public class WriterBuilder {
private static final String tsSchema = "CREATE TABLE thingsboard.ts_kv_cf (\n" +
" entity_type text, // (DEVICE, CUSTOMER, TENANT)\n" +
" entity_id timeuuid,\n" +
" key text,\n" +
" partition bigint,\n" +
" ts bigint,\n" +
" bool_v boolean,\n" +
" str_v text,\n" +
" long_v bigint,\n" +
" dbl_v double,\n" +
" PRIMARY KEY (( entity_type, entity_id, key, partition ), ts)\n" +
");";
private static final String latestSchema = "CREATE TABLE IF NOT EXISTS thingsboard.ts_kv_latest_cf (\n" +
" entity_type text, // (DEVICE, CUSTOMER, TENANT)\n" +
" entity_id timeuuid,\n" +
" key text,\n" +
" ts bigint,\n" +
" bool_v boolean,\n" +
" str_v text,\n" +
" long_v bigint,\n" +
" dbl_v double,\n" +
" PRIMARY KEY (( entity_type, entity_id ), key)\n" +
") WITH compaction = { 'class' : 'LeveledCompactionStrategy' };";
private static final String partitionSchema = "CREATE TABLE IF NOT EXISTS thingsboard.ts_kv_partitions_cf (\n" +
" entity_type text, // (DEVICE, CUSTOMER, TENANT)\n" +
" entity_id timeuuid,\n" +
" key text,\n" +
" partition bigint,\n" +
" PRIMARY KEY (( entity_type, entity_id, key ), partition)\n" +
") WITH CLUSTERING ORDER BY ( partition ASC )\n" +
" AND compaction = { 'class' : 'LeveledCompactionStrategy' };";
public static CQLSSTableWriter getTsWriter(File dir) {
return CQLSSTableWriter.builder()
.inDirectory(dir)
.forTable(tsSchema)
.using("INSERT INTO thingsboard.ts_kv_cf (entity_type, entity_id, key, partition, ts, bool_v, str_v, long_v, dbl_v) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)")
.build();
}
public static CQLSSTableWriter getLatestWriter(File dir) {
return CQLSSTableWriter.builder()
.inDirectory(dir)
.forTable(latestSchema)
.using("INSERT INTO thingsboard.ts_kv_latest_cf (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)")
.build();
}
public static CQLSSTableWriter getPartitionWriter(File dir) {
return CQLSSTableWriter.builder()
.inDirectory(dir)
.forTable(partitionSchema)
.using("INSERT INTO thingsboard.ts_kv_partitions_cf (entity_type, entity_id, key, partition) " +
"VALUES (?, ?, ?, ?)")
.build();
}
}