diff --git a/tools/pom.xml b/tools/pom.xml index 7402fe4708..16d511e6db 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -51,6 +51,38 @@ com.google.guava guava + + org.apache.cassandra + cassandra-all + 3.11.4 + compile + + + commons-io + commons-io + 2.6 + compile + + + + + + maven-assembly-plugin + + + + org.thingsboard.client.tools.migrator.MigratorTool + + + + jar-with-dependencies + + + + + + + diff --git a/tools/src/main/java/org/thingsboard/client/tools/migrator/MigratorTool.java b/tools/src/main/java/org/thingsboard/client/tools/migrator/MigratorTool.java new file mode 100644 index 0000000000..fdc1917ca8 --- /dev/null +++ b/tools/src/main/java/org/thingsboard/client/tools/migrator/MigratorTool.java @@ -0,0 +1,93 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.client.tools.migrator; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; + +import java.io.File; + +public class MigratorTool { + + public static void main(String[] args) { + CommandLine cmd = parseArgs(args); + + + try { + File latestSource = new File(cmd.getOptionValue("latestTelemetryFrom")); + File latestSaveDir = new File(cmd.getOptionValue("latestTelemetryOut")); + File tsSource = new File(cmd.getOptionValue("telemetryFrom")); + File tsSaveDir = new File(cmd.getOptionValue("telemetryOut")); + File partitionsSaveDir = new File(cmd.getOptionValue("partitionsOut")); + boolean castEnable = Boolean.parseBoolean(cmd.getOptionValue("castEnable")); + + PgCaLatestMigrator.migrateLatest(latestSource, latestSaveDir, castEnable); + PostgresToCassandraTelemetryMigrator.migrateTs(tsSource, tsSaveDir, partitionsSaveDir, castEnable); + + } catch (Throwable th) { + th.printStackTrace(); + throw new IllegalStateException("failed", th); + } + + } + + private static CommandLine parseArgs(String[] args) { + Options options = new Options(); + + Option latestTsOpt = new Option("latestFrom", "latestTelemetryFrom", true, "latest telemetry source file path"); + latestTsOpt.setRequired(true); + options.addOption(latestTsOpt); + + Option latestTsOutOpt = new Option("latestOut", "latestTelemetryOut", true, "latest telemetry save dir"); + latestTsOutOpt.setRequired(true); + options.addOption(latestTsOutOpt); + + Option tsOpt = new Option("tsFrom", "telemetryFrom", true, "telemetry source file path"); + tsOpt.setRequired(true); + options.addOption(tsOpt); + + Option tsOutOpt = new Option("tsOut", "telemetryOut", true, "sstable save dir"); + tsOutOpt.setRequired(true); + options.addOption(tsOutOpt); + + Option partitionOutOpt = new Option("partitionsOut", "partitionsOut", true, "partitions save dir"); + partitionOutOpt.setRequired(true); + options.addOption(partitionOutOpt); + + Option castOpt = new Option("castEnable", "castEnable", true, "cast String to Double if possible"); + castOpt.setRequired(true); + options.addOption(castOpt); + + HelpFormatter formatter = new HelpFormatter(); + CommandLineParser parser = new BasicParser(); + + try { + return parser.parse(options, args); + } catch (ParseException e) { + System.out.println(e.getMessage()); + formatter.printHelp("utility-name", options); + + System.exit(1); + } + return null; + } + +} diff --git a/tools/src/main/java/org/thingsboard/client/tools/migrator/PgCaLatestMigrator.java b/tools/src/main/java/org/thingsboard/client/tools/migrator/PgCaLatestMigrator.java new file mode 100644 index 0000000000..667f5e6d0f --- /dev/null +++ b/tools/src/main/java/org/thingsboard/client/tools/migrator/PgCaLatestMigrator.java @@ -0,0 +1,179 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.client.tools.migrator; + +import com.google.common.collect.Lists; +import org.apache.cassandra.io.sstable.CQLSSTableWriter; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.LineIterator; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +public class PgCaLatestMigrator { + + private static final long LOG_BATCH = 1000000; + private static final long rowPerFile = 1000000; + + + private static long linesProcessed = 0; + private static long linesMigrated = 0; + private static long castErrors = 0; + private static long castedOk = 0; + + private static long currentWriterCount = 1; + private static CQLSSTableWriter currentTsWriter = null; + + public static void migrateLatest(File sourceFile, File outDir, boolean castStringsIfPossible) throws IOException { + long startTs = System.currentTimeMillis(); + long stepLineTs = System.currentTimeMillis(); + long stepOkLineTs = System.currentTimeMillis(); + LineIterator iterator = FileUtils.lineIterator(sourceFile); + currentTsWriter = WriterBuilder.getTsWriter(outDir); + + boolean isBlockStarted = false; + boolean isBlockFinished = false; + + String line; + while (iterator.hasNext()) { + if (linesProcessed++ % LOG_BATCH == 0) { + System.out.println(new Date() + " linesProcessed = " + linesProcessed + " in " + (System.currentTimeMillis() - stepLineTs) + " castOk " + castedOk + " castErr " + castErrors); + stepLineTs = System.currentTimeMillis(); + } + + line = iterator.nextLine(); + + if (isBlockFinished) { + break; + } + + if (!isBlockStarted) { + if (isBlockStarted(line)) { + System.out.println(); + System.out.println(); + System.out.println(line); + System.out.println(); + System.out.println(); + isBlockStarted = true; + } + continue; + } + + if (isBlockFinished(line)) { + isBlockFinished = true; + } else { + try { + List raw = Arrays.stream(line.trim().split("\t")) + .map(String::trim) + .filter(StringUtils::isNotEmpty) + .collect(Collectors.toList()); + List values = toValues(raw); + + if (currentWriterCount == 0) { + System.out.println(new Date() + " close writer " + new Date()); + currentTsWriter.close(); + currentTsWriter = WriterBuilder.getLatestWriter(outDir); + } + + if (castStringsIfPossible) { + currentTsWriter.addRow(castToNumericIfPossible(values)); + } else { + currentTsWriter.addRow(values); + } + currentWriterCount++; + if (currentWriterCount >= rowPerFile) { + currentWriterCount = 0; + } + + if (linesMigrated++ % LOG_BATCH == 0) { + System.out.println(new Date() + " migrated = " + linesMigrated + " in " + (System.currentTimeMillis() - stepOkLineTs)); + stepOkLineTs = System.currentTimeMillis(); + } + } catch (Exception ex) { + System.out.println(ex.getMessage() + " -> " + line); + } + + } + } + + long endTs = System.currentTimeMillis(); + System.out.println(); + System.out.println(new Date() + " Migrated rows " + linesMigrated + " in " + (endTs - startTs)); + + currentTsWriter.close(); + System.out.println(); + System.out.println("Finished migrate Latest Telemetry"); + } + + + private static List castToNumericIfPossible(List values) { + try { + if (values.get(6) != null && NumberUtils.isNumber(values.get(6).toString())) { + Double casted = NumberUtils.createDouble(values.get(6).toString()); + List numeric = Lists.newArrayList(); + numeric.addAll(values); + numeric.set(6, null); + numeric.set(8, casted); + castedOk++; + return numeric; + } + } catch (Throwable th) { + castErrors++; + } + return values; + } + + private static List toValues(List raw) { + //expected Table structure: +// COPY public.ts_kv_latest (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) FROM stdin; + + + List result = new ArrayList<>(); + result.add(raw.get(0)); + result.add(fromString(raw.get(1))); + result.add(raw.get(2)); + + long ts = Long.parseLong(raw.get(3)); + result.add(ts); + + result.add(raw.get(4).equals("\\N") ? null : raw.get(4).equals("t") ? Boolean.TRUE : Boolean.FALSE); + result.add(raw.get(5).equals("\\N") ? null : raw.get(5)); + result.add(raw.get(6).equals("\\N") ? null : Long.parseLong(raw.get(6))); + result.add(raw.get(7).equals("\\N") ? null : Double.parseDouble(raw.get(7))); + return result; + } + + public static UUID fromString(String src) { + return UUID.fromString(src.substring(7, 15) + "-" + src.substring(3, 7) + "-1" + + src.substring(0, 3) + "-" + src.substring(15, 19) + "-" + src.substring(19)); + } + + private static boolean isBlockStarted(String line) { + return line.startsWith("COPY public.ts_kv_latest"); + } + + private static boolean isBlockFinished(String line) { + return StringUtils.isBlank(line) || line.equals("\\."); + } +} diff --git a/tools/src/main/java/org/thingsboard/client/tools/migrator/PostgresToCassandraTelemetryMigrator.java b/tools/src/main/java/org/thingsboard/client/tools/migrator/PostgresToCassandraTelemetryMigrator.java new file mode 100644 index 0000000000..fd8562ce3a --- /dev/null +++ b/tools/src/main/java/org/thingsboard/client/tools/migrator/PostgresToCassandraTelemetryMigrator.java @@ -0,0 +1,222 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.client.tools.migrator; + +import com.google.common.collect.Lists; +import org.apache.cassandra.io.sstable.CQLSSTableWriter; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.LineIterator; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; + +import java.io.File; +import java.io.IOException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +public class PostgresToCassandraTelemetryMigrator { + + private static final long LOG_BATCH = 1000000; + private static final long rowPerFile = 1000000; + + + private static long linesProcessed = 0; + private static long linesMigrated = 0; + private static long castErrors = 0; + private static long castedOk = 0; + + private static long currentWriterCount = 1; + private static CQLSSTableWriter currentTsWriter = null; + private static CQLSSTableWriter currentPartitionWriter = null; + + private static Set partitions = new HashSet<>(); + + + public static void migrateTs(File sourceFile, File outTsDir, File outPartitionDir, boolean castStringsIfPossible) throws IOException { + long startTs = System.currentTimeMillis(); + long stepLineTs = System.currentTimeMillis(); + long stepOkLineTs = System.currentTimeMillis(); + LineIterator iterator = FileUtils.lineIterator(sourceFile); + currentTsWriter = WriterBuilder.getTsWriter(outTsDir); + currentPartitionWriter = WriterBuilder.getPartitionWriter(outPartitionDir); + + boolean isBlockStarted = false; + boolean isBlockFinished = false; + + String line; + while (iterator.hasNext()) { + if (linesProcessed++ % LOG_BATCH == 0) { + System.out.println(new Date() + " linesProcessed = " + linesProcessed + " in " + (System.currentTimeMillis() - stepLineTs) + " castOk " + castedOk + " castErr " + castErrors); + stepLineTs = System.currentTimeMillis(); + } + + line = iterator.nextLine(); + + if (isBlockFinished) { + break; + } + + if (!isBlockStarted) { + if (isBlockStarted(line)) { + System.out.println(); + System.out.println(); + System.out.println(line); + System.out.println(); + System.out.println(); + isBlockStarted = true; + } + continue; + } + + if (isBlockFinished(line)) { + isBlockFinished = true; + } else { + try { + List raw = Arrays.stream(line.trim().split("\t")) + .map(String::trim) + .filter(StringUtils::isNotEmpty) + .collect(Collectors.toList()); + List values = toValues(raw); + + if (currentWriterCount == 0) { + System.out.println(new Date() + " close writer " + new Date()); + currentTsWriter.close(); + currentTsWriter = WriterBuilder.getTsWriter(outTsDir); + } + + if (castStringsIfPossible) { + currentTsWriter.addRow(castToNumericIfPossible(values)); + } else { + currentTsWriter.addRow(values); + } + processPartitions(values); + currentWriterCount++; + if (currentWriterCount >= rowPerFile) { + currentWriterCount = 0; + } + + if (linesMigrated++ % LOG_BATCH == 0) { + System.out.println(new Date() + " migrated = " + linesMigrated + " in " + (System.currentTimeMillis() - stepOkLineTs) + " partitions = " + partitions.size()); + stepOkLineTs = System.currentTimeMillis(); + } + } catch (Exception ex) { + System.out.println(ex.getMessage() + " -> " + line); + } + + } + } + + long endTs = System.currentTimeMillis(); + System.out.println(); + System.out.println(new Date() + " Migrated rows " + linesMigrated + " in " + (endTs - startTs)); + System.out.println("Partitions collected " + partitions.size()); + + startTs = System.currentTimeMillis(); + for (String partition : partitions) { + String[] split = partition.split("\\|"); + List values = Lists.newArrayList(); + values.add(split[0]); + values.add(UUID.fromString(split[1])); + values.add(split[2]); + values.add(Long.parseLong(split[3])); + currentPartitionWriter.addRow(values); + } + currentPartitionWriter.close(); + endTs = System.currentTimeMillis(); + System.out.println(); + System.out.println(); + System.out.println(new Date() + " Migrated partitions " + partitions.size() + " in " + (endTs - startTs)); + + + currentTsWriter.close(); + System.out.println(); + System.out.println("Finished migrate Telemetry"); + } + + private static List castToNumericIfPossible(List values) { + try { + if (values.get(6) != null && NumberUtils.isNumber(values.get(6).toString())) { + Double casted = NumberUtils.createDouble(values.get(6).toString()); + List numeric = Lists.newArrayList(); + numeric.addAll(values); + numeric.set(6, null); + numeric.set(8, casted); + castedOk++; + return numeric; + } + } catch (Throwable th) { + castErrors++; + } + return values; + } + + private static void processPartitions(List values) { + String key = values.get(0) + "|" + values.get(1) + "|" + values.get(2) + "|" + values.get(3); + partitions.add(key); + } + + private static List toValues(List raw) { + //expected Table structure: +// COPY public.ts_kv (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) FROM stdin; + + + List result = new ArrayList<>(); + result.add(raw.get(0)); + result.add(fromString(raw.get(1))); + result.add(raw.get(2)); + + long ts = Long.parseLong(raw.get(3)); + long partition = toPartitionTs(ts); + result.add(partition); + result.add(ts); + + result.add(raw.get(4).equals("\\N") ? null : raw.get(4).equals("t") ? Boolean.TRUE : Boolean.FALSE); + result.add(raw.get(5).equals("\\N") ? null : raw.get(5)); + result.add(raw.get(6).equals("\\N") ? null : Long.parseLong(raw.get(6))); + result.add(raw.get(7).equals("\\N") ? null : Double.parseDouble(raw.get(7))); + return result; + } + + public static UUID fromString(String src) { + return UUID.fromString(src.substring(7, 15) + "-" + src.substring(3, 7) + "-1" + + src.substring(0, 3) + "-" + src.substring(15, 19) + "-" + src.substring(19)); + } + + private static long toPartitionTs(long ts) { + LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); + return time.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1).toInstant(ZoneOffset.UTC).toEpochMilli(); +// return TsPartitionDate.MONTHS.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli(); + } + + private static boolean isBlockStarted(String line) { + return line.startsWith("COPY public.ts_kv"); + } + + private static boolean isBlockFinished(String line) { + return StringUtils.isBlank(line) || line.equals("\\."); + } + +} diff --git a/tools/src/main/java/org/thingsboard/client/tools/migrator/README.md b/tools/src/main/java/org/thingsboard/client/tools/migrator/README.md new file mode 100644 index 0000000000..da79f39701 --- /dev/null +++ b/tools/src/main/java/org/thingsboard/client/tools/migrator/README.md @@ -0,0 +1,62 @@ +# Description: +Tool used for migrating ThingsBoard into hybrid mode from Postgres. +Performance of this tool depends on disk type and instance type (mostly on CPU resources). +But in general here are few benchmarks: +1. Creating Dump of the postgres ts_kv table -> 100GB = 90 minutes +2. If postgres table has size 100GB then dump file will be about 30GB +3. Generation SSTables from dump -> 100GB = 3 hours +4. 100GB Dump file will be converted into SSTable with size about 18GB + +# How to build Tool: +Switch to `tools` project in Command Line and execute +`mvn clean compile assembly:single` +It will generate single jar with dependencies. + +# Instruction: + +1. Dump Telemetry table from the source Postgres table. Do not use compression if possible because Tool can only work with uncompressed file +dump ts_kv table -> `pg_dump -h localhost -U postgres -d thingsboard -t ts_kv > ts_kv.dmp` + +2. Dump Latest Telemetry table from the source Postgres table. Do not use compression if possible because Tool can only work with uncompressed file +dump ts_kv_latest -> `pg_dump -h localhost -U postgres -d thingsboard -t ts_kv_latest > ts_kv_latest.dmp` + +3. [Optional] - move dumped files to the machine where cassandra will be hosted + +4. Prepare directory structure: +Tool will use 3 different directories for saving SSTables - ts_kv_cf, ts_kv_latest_cf, ts_kv_partitions_cf +Create 3 empty directories. For example: + /home/ubunut/migration/ts + /home/ubunut/migration/ts_latest + /home/ubunut/migration/ts_partition + +5. Run tool: +*Note: if you run this tool on remote instance - don't forget to execute this command in screen to avoid unexpected termination +java -jar ./tools-2.4.1-SNAPSHOT-jar-with-dependencies.jar + -latestFrom ./source/ts_kv_latest.dmp + -latestOut /home/ubunut/migration/ts_latest + -tsFrom ./source/ts_kv.dmp + -tsOut /home/ubunut/migration/ts + -partitionsOut /home/ubunut/migration/ts_partition + -castEnable false + + +## After tool finished +1. install Cassandra on the instance +2. Using `cqlsh` create `thingsboard` keyspace and requred tables from this file `schema-ts.cql` +3. Stop Cassandra +4. Copy generated SSTable files into cassandra data dir: + sudo find /home/ubunut/migration/ts -name '*.*' -exec mv {} /var/lib/cassandra/data/thingsboard/ts_kv_cf-0e9aaf00ee5511e9a5fa7d6f489ffd13/ \; + sudo find /home/ubunut/migration/ts_latest -name '*.*' -exec mv {} /var/lib/cassandra/data/thingsboard/ts_kv_latest_cf-161449d0ee5511e9a5fa7d6f489ffd13/ \; + sudo find /home/ubunut/migration/ts_partition -name '*.*' -exec mv {} /var/lib/cassandra/data/thingsboard/ts_kv_partitions_cf-12e8fa80ee5511e9a5fa7d6f489ffd13/ \; + +5. Start Cassandra service and trigger compaction + trigger compactions: nodetool compact thingsboard + check compaction status: nodetool compactionstats + +6. Switch Thignsboard to hybrid mode: +Modify Thingsboard properites file `thingsboard.yml` + - DATABASE_TS_TYPE = cassandra + - TS_KV_PARTITIONING = MONTHS + - [optional] - connection properties for cassandra + +7. Start Thingsboard \ No newline at end of file diff --git a/tools/src/main/java/org/thingsboard/client/tools/migrator/WriterBuilder.java b/tools/src/main/java/org/thingsboard/client/tools/migrator/WriterBuilder.java new file mode 100644 index 0000000000..73be1a7fd1 --- /dev/null +++ b/tools/src/main/java/org/thingsboard/client/tools/migrator/WriterBuilder.java @@ -0,0 +1,84 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.client.tools.migrator; + +import org.apache.cassandra.io.sstable.CQLSSTableWriter; + +import java.io.File; + +public class WriterBuilder { + + private static final String tsSchema = "CREATE TABLE thingsboard.ts_kv_cf (\n" + + " entity_type text, // (DEVICE, CUSTOMER, TENANT)\n" + + " entity_id timeuuid,\n" + + " key text,\n" + + " partition bigint,\n" + + " ts bigint,\n" + + " bool_v boolean,\n" + + " str_v text,\n" + + " long_v bigint,\n" + + " dbl_v double,\n" + + " PRIMARY KEY (( entity_type, entity_id, key, partition ), ts)\n" + + ");"; + + private static final String latestSchema = "CREATE TABLE IF NOT EXISTS thingsboard.ts_kv_latest_cf (\n" + + " entity_type text, // (DEVICE, CUSTOMER, TENANT)\n" + + " entity_id timeuuid,\n" + + " key text,\n" + + " ts bigint,\n" + + " bool_v boolean,\n" + + " str_v text,\n" + + " long_v bigint,\n" + + " dbl_v double,\n" + + " PRIMARY KEY (( entity_type, entity_id ), key)\n" + + ") WITH compaction = { 'class' : 'LeveledCompactionStrategy' };"; + + private static final String partitionSchema = "CREATE TABLE IF NOT EXISTS thingsboard.ts_kv_partitions_cf (\n" + + " entity_type text, // (DEVICE, CUSTOMER, TENANT)\n" + + " entity_id timeuuid,\n" + + " key text,\n" + + " partition bigint,\n" + + " PRIMARY KEY (( entity_type, entity_id, key ), partition)\n" + + ") WITH CLUSTERING ORDER BY ( partition ASC )\n" + + " AND compaction = { 'class' : 'LeveledCompactionStrategy' };"; + + public static CQLSSTableWriter getTsWriter(File dir) { + return CQLSSTableWriter.builder() + .inDirectory(dir) + .forTable(tsSchema) + .using("INSERT INTO thingsboard.ts_kv_cf (entity_type, entity_id, key, partition, ts, bool_v, str_v, long_v, dbl_v) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)") + .build(); + } + + public static CQLSSTableWriter getLatestWriter(File dir) { + return CQLSSTableWriter.builder() + .inDirectory(dir) + .forTable(latestSchema) + .using("INSERT INTO thingsboard.ts_kv_latest_cf (entity_type, entity_id, key, ts, bool_v, str_v, long_v, dbl_v) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)") + .build(); + } + + public static CQLSSTableWriter getPartitionWriter(File dir) { + return CQLSSTableWriter.builder() + .inDirectory(dir) + .forTable(partitionSchema) + .using("INSERT INTO thingsboard.ts_kv_partitions_cf (entity_type, entity_id, key, partition) " + + "VALUES (?, ?, ?, ?)") + .build(); + } +}