Cassandra tests moved from cassandra-unit to testcontainers

This commit is contained in:
Sergey Matvienko 2023-02-28 16:25:11 +01:00
parent ebdcda773c
commit 27f5f8c077
16 changed files with 122 additions and 1413 deletions

View File

@ -144,21 +144,6 @@
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.thingsboard</groupId>
<artifactId>ui-ngx</artifactId>
@ -329,6 +314,11 @@
<artifactId>spring-test-dbunit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>cassandra</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>

View File

@ -15,30 +15,14 @@
*/
package org.thingsboard.server.transport;
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.extensions.cpsuite.ClasspathSuite;
import org.junit.runner.RunWith;
import org.thingsboard.server.dao.CustomCassandraCQLUnit;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import java.util.Arrays;
import org.thingsboard.server.dao.AbstractNoSqlContainer;
@RunWith(ClasspathSuite.class)
@ClasspathSuite.ClassnameFilters({
"org.thingsboard.server.transport.*.telemetry.timeseries.nosql.*Test",
})
public class TransportNoSqlTestSuite {
@ClassRule
public static CustomCassandraCQLUnit cassandraUnit =
new CustomCassandraCQLUnit(
Arrays.asList(
new ClassPathCQLDataSet("cassandra/schema-keyspace.cql", false, false),
new ClassPathCQLDataSet("cassandra/schema-ts.cql", false, false),
new ClassPathCQLDataSet("cassandra/schema-ts-latest.cql", false, false)
),
"cassandra-test.yaml", 30000l);
public class TransportNoSqlTestSuite extends AbstractNoSqlContainer {
}

View File

@ -13,9 +13,10 @@
<logger name="org.springframework" level="WARN"/>
<logger name="org.springframework.boot.test" level="WARN"/>
<logger name="org.apache.cassandra" level="WARN"/>
<logger name="org.cassandraunit" level="INFO"/>
<logger name="org.testcontainers" level="INFO" />
<logger name="org.eclipse.leshan" level="INFO"/>
<!-- mute TelemetryEdgeSqlTest that causes a lot of randomly generated errors -->
<logger name="org.thingsboard.server.service.edge.rpc.EdgeGrpcSession" level="OFF"/>

View File

@ -162,26 +162,6 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-thrift</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
@ -211,6 +191,11 @@
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>cassandra</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>

View File

@ -15,7 +15,7 @@
--
CREATE TABLE IF NOT EXISTS thingsboard.ts_kv_latest_cf (
entity_type text, // (DEVICE, CUSTOMER, TENANT)
entity_type text, -- (DEVICE, CUSTOMER, TENANT)
entity_id timeuuid,
key text,
ts bigint,

View File

@ -15,7 +15,7 @@
--
CREATE TABLE IF NOT EXISTS thingsboard.ts_kv_cf (
entity_type text, // (DEVICE, CUSTOMER, TENANT)
entity_type text, -- (DEVICE, CUSTOMER, TENANT)
entity_id timeuuid,
key text,
partition bigint,
@ -29,7 +29,7 @@ CREATE TABLE IF NOT EXISTS thingsboard.ts_kv_cf (
);
CREATE TABLE IF NOT EXISTS thingsboard.ts_kv_partitions_cf (
entity_type text, // (DEVICE, CUSTOMER, TENANT)
entity_type text, -- (DEVICE, CUSTOMER, TENANT)
entity_id timeuuid,
key text,
partition bigint,

View File

@ -1,366 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable;
import java.io.File;
import java.io.IOError;
import java.io.IOException;
import java.util.*;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CharMatcher;
import com.google.common.base.Objects;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer;
import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer;
import org.apache.cassandra.io.sstable.metadata.MetadataSerializer;
import org.apache.cassandra.utils.Pair;
import static org.apache.cassandra.io.sstable.Component.separator;
/**
* A SSTable is described by the keyspace and column family it contains data
* for, a generation (where higher generations contain more recent data) and
* an alphabetic version string.
*
* A descriptor can be marked as temporary, which influences generated filenames.
*/
public class Descriptor
{
public static String TMP_EXT = ".tmp";
/** canonicalized path to the directory where SSTable resides */
public final File directory;
/** version has the following format: <code>[a-z]+</code> */
public final Version version;
public final String ksname;
public final String cfname;
public final int generation;
public final SSTableFormat.Type formatType;
/** digest component - might be {@code null} for old, legacy sstables */
public final Component digestComponent;
private final int hashCode;
/**
* A descriptor that assumes CURRENT_VERSION.
*/
@VisibleForTesting
public Descriptor(File directory, String ksname, String cfname, int generation)
{
this(SSTableFormat.Type.current().info.getLatestVersion(), directory, ksname, cfname, generation, SSTableFormat.Type.current(), null);
}
/**
* Constructor for sstable writers only.
*/
public Descriptor(File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
{
this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, formatType, Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType()));
}
@VisibleForTesting
public Descriptor(String version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
{
this(formatType.info.getVersion(version), directory, ksname, cfname, generation, formatType, Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType()));
}
public Descriptor(Version version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType, Component digestComponent)
{
assert version != null && directory != null && ksname != null && cfname != null && formatType.info.getLatestVersion().getClass().equals(version.getClass());
this.version = version;
try
{
this.directory = directory.getCanonicalFile();
}
catch (IOException e)
{
throw new IOError(e);
}
this.ksname = ksname;
this.cfname = cfname;
this.generation = generation;
this.formatType = formatType;
this.digestComponent = digestComponent;
hashCode = Objects.hashCode(version, this.directory, generation, ksname, cfname, formatType);
}
public Descriptor withGeneration(int newGeneration)
{
return new Descriptor(version, directory, ksname, cfname, newGeneration, formatType, digestComponent);
}
public Descriptor withFormatType(SSTableFormat.Type newType)
{
return new Descriptor(newType.info.getLatestVersion(), directory, ksname, cfname, generation, newType, digestComponent);
}
public Descriptor withDigestComponent(Component newDigestComponent)
{
return new Descriptor(version, directory, ksname, cfname, generation, formatType, newDigestComponent);
}
public String tmpFilenameFor(Component component)
{
return filenameFor(component) + TMP_EXT;
}
public String filenameFor(Component component)
{
return baseFilename() + separator + component.name();
}
public String baseFilename()
{
StringBuilder buff = new StringBuilder();
buff.append(directory).append(File.separatorChar);
appendFileName(buff);
return buff.toString();
}
private void appendFileName(StringBuilder buff)
{
if (!version.hasNewFileName())
{
buff.append(ksname).append(separator);
buff.append(cfname).append(separator);
}
buff.append(version).append(separator);
buff.append(generation);
if (formatType != SSTableFormat.Type.LEGACY)
buff.append(separator).append(formatType.name);
}
public String relativeFilenameFor(Component component)
{
final StringBuilder buff = new StringBuilder();
appendFileName(buff);
buff.append(separator).append(component.name());
return buff.toString();
}
public SSTableFormat getFormat()
{
return formatType.info;
}
/** Return any temporary files found in the directory */
public List<File> getTemporaryFiles()
{
List<File> ret = new ArrayList<>();
File[] tmpFiles = directory.listFiles((dir, name) ->
name.endsWith(Descriptor.TMP_EXT));
for (File tmpFile : tmpFiles)
ret.add(tmpFile);
return ret;
}
/**
* Files obsoleted by CASSANDRA-7066 : temporary files and compactions_in_progress. We support
* versions 2.1 (ka) and 2.2 (la).
* Temporary files have tmp- or tmplink- at the beginning for 2.2 sstables or after ks-cf- for 2.1 sstables
*/
private final static String LEGACY_COMP_IN_PROG_REGEX_STR = "^compactions_in_progress(\\-[\\d,a-f]{32})?$";
private final static Pattern LEGACY_COMP_IN_PROG_REGEX = Pattern.compile(LEGACY_COMP_IN_PROG_REGEX_STR);
private final static String LEGACY_TMP_REGEX_STR = "^((.*)\\-(.*)\\-)?tmp(link)?\\-((?:l|k).)\\-(\\d)*\\-(.*)$";
private final static Pattern LEGACY_TMP_REGEX = Pattern.compile(LEGACY_TMP_REGEX_STR);
public static boolean isLegacyFile(File file)
{
if (file.isDirectory())
return file.getParentFile() != null &&
file.getParentFile().getName().equalsIgnoreCase("system") &&
LEGACY_COMP_IN_PROG_REGEX.matcher(file.getName()).matches();
else
return LEGACY_TMP_REGEX.matcher(file.getName()).matches();
}
public static boolean isValidFile(String fileName)
{
return fileName.endsWith(".db") && !LEGACY_TMP_REGEX.matcher(fileName).matches();
}
/**
* @see #fromFilename(File directory, String name)
* @param filename The SSTable filename
* @return Descriptor of the SSTable initialized from filename
*/
public static Descriptor fromFilename(String filename)
{
return fromFilename(filename, false);
}
public static Descriptor fromFilename(String filename, SSTableFormat.Type formatType)
{
return fromFilename(filename).withFormatType(formatType);
}
public static Descriptor fromFilename(String filename, boolean skipComponent)
{
File file = new File(filename).getAbsoluteFile();
return fromFilename(file.getParentFile(), file.getName(), skipComponent).left;
}
public static Pair<Descriptor, String> fromFilename(File directory, String name)
{
return fromFilename(directory, name, false);
}
/**
* Filename of the form is vary by version:
*
* <ul>
* <li>&lt;ksname&gt;-&lt;cfname&gt;-(tmp-)?&lt;version&gt;-&lt;gen&gt;-&lt;component&gt; for cassandra 2.0 and before</li>
* <li>(&lt;tmp marker&gt;-)?&lt;version&gt;-&lt;gen&gt;-&lt;component&gt; for cassandra 3.0 and later</li>
* </ul>
*
* If this is for SSTable of secondary index, directory should ends with index name for 2.1+.
*
* @param directory The directory of the SSTable files
* @param name The name of the SSTable file
* @param skipComponent true if the name param should not be parsed for a component tag
*
* @return A Descriptor for the SSTable, and the Component remainder.
*/
@SuppressWarnings("deprecation")
public static Pair<Descriptor, String> fromFilename(File directory, String name, boolean skipComponent)
{
File parentDirectory = directory != null ? directory : new File(".");
// tokenize the filename
StringTokenizer st = new StringTokenizer(name, String.valueOf(separator));
String nexttok;
// read tokens backwards to determine version
Deque<String> tokenStack = new ArrayDeque<>();
while (st.hasMoreTokens())
{
tokenStack.push(st.nextToken());
}
// component suffix
String component = skipComponent ? null : tokenStack.pop();
nexttok = tokenStack.pop();
// generation OR format type
SSTableFormat.Type fmt = SSTableFormat.Type.LEGACY;
if (!CharMatcher.digit().matchesAllOf(nexttok))
{
fmt = SSTableFormat.Type.validate(nexttok);
nexttok = tokenStack.pop();
}
// generation
int generation = Integer.parseInt(nexttok);
// version
nexttok = tokenStack.pop();
if (!Version.validate(nexttok))
throw new UnsupportedOperationException("SSTable " + name + " is too old to open. Upgrade to 2.0 first, and run upgradesstables");
Version version = fmt.info.getVersion(nexttok);
// ks/cf names
String ksname, cfname;
if (version.hasNewFileName())
{
// for 2.1+ read ks and cf names from directory
File cfDirectory = parentDirectory;
// check if this is secondary index
String indexName = "";
if (cfDirectory.getName().startsWith(Directories.SECONDARY_INDEX_NAME_SEPARATOR))
{
indexName = cfDirectory.getName();
cfDirectory = cfDirectory.getParentFile();
}
if (cfDirectory.getName().equals(Directories.BACKUPS_SUBDIR))
{
cfDirectory = cfDirectory.getParentFile();
}
else if (cfDirectory.getParentFile().getName().equals(Directories.SNAPSHOT_SUBDIR))
{
cfDirectory = cfDirectory.getParentFile().getParentFile();
}
cfname = cfDirectory.getName().split("-")[0] + indexName;
ksname = cfDirectory.getParentFile().getName();
}
else
{
cfname = tokenStack.pop();
ksname = tokenStack.pop();
}
assert tokenStack.isEmpty() : "Invalid file name " + name + " in " + directory;
return Pair.create(new Descriptor(version, parentDirectory, ksname, cfname, generation, fmt,
// _assume_ version from version
Component.digestFor(version.uncompressedChecksumType())),
component);
}
@SuppressWarnings("deprecation")
public IMetadataSerializer getMetadataSerializer()
{
if (version.hasNewStatsFile())
return new MetadataSerializer();
else
return new LegacyMetadataSerializer();
}
/**
* @return true if the current Cassandra version can read the given sstable version
*/
public boolean isCompatible()
{
return version.isCompatible();
}
@Override
public String toString()
{
return baseFilename();
}
@Override
public boolean equals(Object o)
{
if (o == this)
return true;
if (!(o instanceof Descriptor))
return false;
Descriptor that = (Descriptor)o;
return that.directory.equals(this.directory)
&& that.generation == this.generation
&& that.ksname.equals(this.ksname)
&& that.cfname.equals(this.cfname)
&& that.formatType == this.formatType;
}
@Override
public int hashCode()
{
return hashCode;
}
}

View File

@ -1,86 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable.format;
import com.google.common.base.CharMatcher;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
/**
* Provides the accessors to data on disk.
*/
public interface SSTableFormat
{
static boolean enableSSTableDevelopmentTestMode = Boolean.getBoolean("cassandra.test.sstableformatdevelopment");
Version getLatestVersion();
Version getVersion(String version);
SSTableWriter.Factory getWriterFactory();
SSTableReader.Factory getReaderFactory();
RowIndexEntry.IndexSerializer<?> getIndexSerializer(CFMetaData cfm, Version version, SerializationHeader header);
public static enum Type
{
//Used internally to refer to files with no
//format flag in the filename
LEGACY("big", BigFormat.instance),
//The original sstable format
BIG("big", BigFormat.instance);
public final SSTableFormat info;
public final String name;
public static Type current()
{
return BIG;
}
@SuppressWarnings("deprecation")
private Type(String name, SSTableFormat info)
{
//Since format comes right after generation
//we disallow formats with numeric names
// We have removed this check for compatibility with the embedded cassandra used for tests.
assert !CharMatcher.digit().matchesAllOf(name);
this.name = name;
this.info = info;
}
public static Type validate(String name)
{
for (Type valid : Type.values())
{
//This is used internally for old sstables
if (valid == LEGACY)
continue;
if (valid.name.equalsIgnoreCase(name))
return valid;
}
throw new IllegalArgumentException("No Type constant " + name);
}
}
}

View File

@ -1,760 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.util;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileAttributeView;
import java.nio.file.attribute.FileStoreAttributeView;
import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.FSErrorHandler;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.utils.JVMStabilityInspector;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static org.apache.cassandra.utils.Throwables.maybeFail;
import static org.apache.cassandra.utils.Throwables.merge;
public final class FileUtils
{
public static final Charset CHARSET = StandardCharsets.UTF_8;
private static final Logger logger = LoggerFactory.getLogger(FileUtils.class);
public static final long ONE_KB = 1024;
public static final long ONE_MB = 1024 * ONE_KB;
public static final long ONE_GB = 1024 * ONE_MB;
public static final long ONE_TB = 1024 * ONE_GB;
private static final DecimalFormat df = new DecimalFormat("#.##");
public static final boolean isCleanerAvailable = false;
private static final AtomicReference<Optional<FSErrorHandler>> fsErrorHandler = new AtomicReference<>(Optional.empty());
public static void createHardLink(String from, String to)
{
createHardLink(new File(from), new File(to));
}
public static void createHardLink(File from, File to)
{
if (to.exists())
throw new RuntimeException("Tried to create duplicate hard link to " + to);
if (!from.exists())
throw new RuntimeException("Tried to hard link to file that does not exist " + from);
try
{
Files.createLink(to.toPath(), from.toPath());
}
catch (IOException e)
{
throw new FSWriteError(e, to);
}
}
public static File createTempFile(String prefix, String suffix, File directory)
{
try
{
return File.createTempFile(prefix, suffix, directory);
}
catch (IOException e)
{
throw new FSWriteError(e, directory);
}
}
public static File createTempFile(String prefix, String suffix)
{
return createTempFile(prefix, suffix, new File(System.getProperty("java.io.tmpdir")));
}
public static Throwable deleteWithConfirm(String filePath, boolean expect, Throwable accumulate)
{
return deleteWithConfirm(new File(filePath), expect, accumulate);
}
public static Throwable deleteWithConfirm(File file, boolean expect, Throwable accumulate)
{
boolean exists = file.exists();
assert exists || !expect : "attempted to delete non-existing file " + file.getName();
try
{
if (exists)
Files.delete(file.toPath());
}
catch (Throwable t)
{
try
{
throw new FSWriteError(t, file);
}
catch (Throwable t2)
{
accumulate = merge(accumulate, t2);
}
}
return accumulate;
}
public static void deleteWithConfirm(String file)
{
deleteWithConfirm(new File(file));
}
public static void deleteWithConfirm(File file)
{
maybeFail(deleteWithConfirm(file, true, null));
}
public static void renameWithOutConfirm(String from, String to)
{
try
{
atomicMoveWithFallback(new File(from).toPath(), new File(to).toPath());
}
catch (IOException e)
{
if (logger.isTraceEnabled())
logger.trace("Could not move file "+from+" to "+to, e);
}
}
public static void renameWithConfirm(String from, String to)
{
renameWithConfirm(new File(from), new File(to));
}
public static void renameWithConfirm(File from, File to)
{
assert from.exists();
if (logger.isTraceEnabled())
logger.trace("Renaming {} to {}", from.getPath(), to.getPath());
// this is not FSWE because usually when we see it it's because we didn't close the file before renaming it,
// and Windows is picky about that.
try
{
atomicMoveWithFallback(from.toPath(), to.toPath());
}
catch (IOException e)
{
throw new RuntimeException(String.format("Failed to rename %s to %s", from.getPath(), to.getPath()), e);
}
}
/**
* Move a file atomically, if it fails, it falls back to a non-atomic operation
* @param from
* @param to
* @throws IOException
*/
private static void atomicMoveWithFallback(Path from, Path to) throws IOException
{
try
{
Files.move(from, to, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
}
catch (AtomicMoveNotSupportedException e)
{
logger.trace("Could not do an atomic move", e);
Files.move(from, to, StandardCopyOption.REPLACE_EXISTING);
}
}
public static void truncate(String path, long size)
{
try(FileChannel channel = FileChannel.open(Paths.get(path), StandardOpenOption.READ, StandardOpenOption.WRITE))
{
channel.truncate(size);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
public static void closeQuietly(Closeable c)
{
try
{
if (c != null)
c.close();
}
catch (Exception e)
{
logger.warn("Failed closing {}", c, e);
}
}
public static void closeQuietly(AutoCloseable c)
{
try
{
if (c != null)
c.close();
}
catch (Exception e)
{
logger.warn("Failed closing {}", c, e);
}
}
public static void close(Closeable... cs) throws IOException
{
close(Arrays.asList(cs));
}
public static void close(Iterable<? extends Closeable> cs) throws IOException
{
Throwable e = null;
for (Closeable c : cs)
{
try
{
if (c != null)
c.close();
}
catch (Throwable ex)
{
if (e == null) e = ex;
else e.addSuppressed(ex);
logger.warn("Failed closing stream {}", c, ex);
}
}
maybeFail(e, IOException.class);
}
public static void closeQuietly(Iterable<? extends AutoCloseable> cs)
{
for (AutoCloseable c : cs)
{
try
{
if (c != null)
c.close();
}
catch (Exception ex)
{
logger.warn("Failed closing {}", c, ex);
}
}
}
public static String getCanonicalPath(String filename)
{
try
{
return new File(filename).getCanonicalPath();
}
catch (IOException e)
{
throw new FSReadError(e, filename);
}
}
public static String getCanonicalPath(File file)
{
try
{
return file.getCanonicalPath();
}
catch (IOException e)
{
throw new FSReadError(e, file);
}
}
/** Return true if file is contained in folder */
public static boolean isContained(File folder, File file)
{
Path folderPath = Paths.get(getCanonicalPath(folder));
Path filePath = Paths.get(getCanonicalPath(file));
return filePath.startsWith(folderPath);
}
/** Convert absolute path into a path relative to the base path */
public static String getRelativePath(String basePath, String path)
{
try
{
return Paths.get(basePath).relativize(Paths.get(path)).toString();
}
catch(Exception ex)
{
String absDataPath = FileUtils.getCanonicalPath(basePath);
return Paths.get(absDataPath).relativize(Paths.get(path)).toString();
}
}
public static void clean(ByteBuffer buffer)
{
if (buffer == null)
return;
}
public static void createDirectory(String directory)
{
createDirectory(new File(directory));
}
public static void createDirectory(File directory)
{
if (!directory.exists())
{
if (!directory.mkdirs())
throw new FSWriteError(new IOException("Failed to mkdirs " + directory), directory);
}
}
public static boolean delete(String file)
{
File f = new File(file);
return f.delete();
}
public static void delete(File... files)
{
if (files == null)
{
// CASSANDRA-13389: some callers use Files.listFiles() which, on error, silently returns null
logger.debug("Received null list of files to delete");
return;
}
for ( File file : files )
{
file.delete();
}
}
public static void deleteAsync(final String file)
{
Runnable runnable = new Runnable()
{
public void run()
{
deleteWithConfirm(new File(file));
}
};
ScheduledExecutors.nonPeriodicTasks.execute(runnable);
}
public static void visitDirectory(Path dir, Predicate<? super File> filter, Consumer<? super File> consumer)
{
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir))
{
StreamSupport.stream(stream.spliterator(), false)
.map(Path::toFile)
// stream directories are weakly consistent so we always check if the file still exists
.filter(f -> f.exists() && (filter == null || filter.test(f)))
.forEach(consumer);
}
catch (IOException|DirectoryIteratorException ex)
{
logger.error("Failed to list files in {} with exception: {}", dir, ex.getMessage(), ex);
}
}
public static String stringifyFileSize(double value)
{
double d;
if ( value >= ONE_TB )
{
d = value / ONE_TB;
String val = df.format(d);
return val + " TiB";
}
else if ( value >= ONE_GB )
{
d = value / ONE_GB;
String val = df.format(d);
return val + " GiB";
}
else if ( value >= ONE_MB )
{
d = value / ONE_MB;
String val = df.format(d);
return val + " MiB";
}
else if ( value >= ONE_KB )
{
d = value / ONE_KB;
String val = df.format(d);
return val + " KiB";
}
else
{
String val = df.format(value);
return val + " bytes";
}
}
/**
* Deletes all files and subdirectories under "dir".
* @param dir Directory to be deleted
* @throws FSWriteError if any part of the tree cannot be deleted
*/
public static void deleteRecursive(File dir)
{
if (dir.isDirectory())
{
String[] children = dir.list();
for (String child : children)
deleteRecursive(new File(dir, child));
}
// The directory is now empty so now it can be smoked
deleteWithConfirm(dir);
}
/**
* Schedules deletion of all file and subdirectories under "dir" on JVM shutdown.
* @param dir Directory to be deleted
*/
public static void deleteRecursiveOnExit(File dir)
{
if (dir.isDirectory())
{
String[] children = dir.list();
for (String child : children)
deleteRecursiveOnExit(new File(dir, child));
}
logger.trace("Scheduling deferred deletion of file: {}", dir);
dir.deleteOnExit();
}
public static void handleCorruptSSTable(CorruptSSTableException e)
{
fsErrorHandler.get().ifPresent(handler -> handler.handleCorruptSSTable(e));
}
public static void handleFSError(FSError e)
{
fsErrorHandler.get().ifPresent(handler -> handler.handleFSError(e));
}
/**
* handleFSErrorAndPropagate will invoke the disk failure policy error handler,
* which may or may not stop the daemon or transports. However, if we don't exit,
* we still want to propagate the exception to the caller in case they have custom
* exception handling
*
* @param e A filesystem error
*/
public static void handleFSErrorAndPropagate(FSError e)
{
JVMStabilityInspector.inspectThrowable(e);
throwIfUnchecked(e);
throw new RuntimeException(e);
}
/**
* Get the size of a directory in bytes
* @param folder The directory for which we need size.
* @return The size of the directory
*/
public static long folderSize(File folder)
{
final long [] sizeArr = {0L};
try
{
Files.walkFileTree(folder.toPath(), new SimpleFileVisitor<Path>()
{
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
{
sizeArr[0] += attrs.size();
return FileVisitResult.CONTINUE;
}
});
}
catch (IOException e)
{
logger.error("Error while getting {} folder size. {}", folder, e);
}
return sizeArr[0];
}
public static void copyTo(DataInput in, OutputStream out, int length) throws IOException
{
byte[] buffer = new byte[64 * 1024];
int copiedBytes = 0;
while (copiedBytes + buffer.length < length)
{
in.readFully(buffer);
out.write(buffer);
copiedBytes += buffer.length;
}
if (copiedBytes < length)
{
int left = length - copiedBytes;
in.readFully(buffer, 0, left);
out.write(buffer, 0, left);
}
}
public static boolean isSubDirectory(File parent, File child) throws IOException
{
parent = parent.getCanonicalFile();
child = child.getCanonicalFile();
File toCheck = child;
while (toCheck != null)
{
if (parent.equals(toCheck))
return true;
toCheck = toCheck.getParentFile();
}
return false;
}
public static void append(File file, String ... lines)
{
if (file.exists())
write(file, Arrays.asList(lines), StandardOpenOption.APPEND);
else
write(file, Arrays.asList(lines), StandardOpenOption.CREATE);
}
public static void appendAndSync(File file, String ... lines)
{
if (file.exists())
write(file, Arrays.asList(lines), StandardOpenOption.APPEND, StandardOpenOption.SYNC);
else
write(file, Arrays.asList(lines), StandardOpenOption.CREATE, StandardOpenOption.SYNC);
}
public static void replace(File file, String ... lines)
{
write(file, Arrays.asList(lines), StandardOpenOption.TRUNCATE_EXISTING);
}
public static void write(File file, List<String> lines, StandardOpenOption ... options)
{
try
{
Files.write(file.toPath(),
lines,
CHARSET,
options);
}
catch (IOException ex)
{
throw new RuntimeException(ex);
}
}
public static List<String> readLines(File file)
{
try
{
return Files.readAllLines(file.toPath(), CHARSET);
}
catch (IOException ex)
{
if (ex instanceof NoSuchFileException)
return Collections.emptyList();
throw new RuntimeException(ex);
}
}
public static void setFSErrorHandler(FSErrorHandler handler)
{
fsErrorHandler.getAndSet(Optional.ofNullable(handler));
}
/**
* Returns the size of the specified partition.
* <p>This method handles large file system by returning {@code Long.MAX_VALUE} if the size overflow.
* See <a href='https://bugs.openjdk.java.net/browse/JDK-8179320'>JDK-8179320</a> for more information.</p>
*
* @param file the partition
* @return the size, in bytes, of the partition or {@code 0L} if the abstract pathname does not name a partition
*/
public static long getTotalSpace(File file)
{
return handleLargeFileSystem(file.getTotalSpace());
}
/**
* Returns the number of unallocated bytes on the specified partition.
* <p>This method handles large file system by returning {@code Long.MAX_VALUE} if the number of unallocated bytes
* overflow. See <a href='https://bugs.openjdk.java.net/browse/JDK-8179320'>JDK-8179320</a> for more information</p>
*
* @param file the partition
* @return the number of unallocated bytes on the partition or {@code 0L}
* if the abstract pathname does not name a partition.
*/
public static long getFreeSpace(File file)
{
return handleLargeFileSystem(file.getFreeSpace());
}
/**
* Returns the number of available bytes on the specified partition.
* <p>This method handles large file system by returning {@code Long.MAX_VALUE} if the number of available bytes
* overflow. See <a href='https://bugs.openjdk.java.net/browse/JDK-8179320'>JDK-8179320</a> for more information</p>
*
* @param file the partition
* @return the number of available bytes on the partition or {@code 0L}
* if the abstract pathname does not name a partition.
*/
public static long getUsableSpace(File file)
{
return handleLargeFileSystem(file.getUsableSpace());
}
/**
* Returns the {@link FileStore} representing the file store where a file
* is located. This {@link FileStore} handles large file system by returning {@code Long.MAX_VALUE}
* from {@code FileStore#getTotalSpace()}, {@code FileStore#getUnallocatedSpace()} and {@code FileStore#getUsableSpace()}
* it the value is bigger than {@code Long.MAX_VALUE}. See <a href='https://bugs.openjdk.java.net/browse/JDK-8162520'>JDK-8162520</a>
* for more information.
*
* @param path the path to the file
* @return the file store where the file is stored
*/
public static FileStore getFileStore(Path path) throws IOException
{
return new SafeFileStore(Files.getFileStore(path));
}
/**
* Handle large file system by returning {@code Long.MAX_VALUE} when the size overflows.
* @param size returned by the Java's FileStore methods
* @return the size or {@code Long.MAX_VALUE} if the size was bigger than {@code Long.MAX_VALUE}
*/
private static long handleLargeFileSystem(long size)
{
return size < 0 ? Long.MAX_VALUE : size;
}
/**
* Private constructor as the class contains only static methods.
*/
private FileUtils()
{
}
/**
* FileStore decorator used to safely handle large file system.
*
* <p>Java's FileStore methods (getTotalSpace/getUnallocatedSpace/getUsableSpace) are limited to reporting bytes as
* signed long (2^63-1), if the filesystem is any bigger, then the size overflows. {@code SafeFileStore} will
* return {@code Long.MAX_VALUE} if the size overflow.</p>
*
* @see https://bugs.openjdk.java.net/browse/JDK-8162520.
*/
private static final class SafeFileStore extends FileStore
{
/**
* The decorated {@code FileStore}
*/
private final FileStore fileStore;
public SafeFileStore(FileStore fileStore)
{
this.fileStore = fileStore;
}
@Override
public String name()
{
return fileStore.name();
}
@Override
public String type()
{
return fileStore.type();
}
@Override
public boolean isReadOnly()
{
return fileStore.isReadOnly();
}
@Override
public long getTotalSpace() throws IOException
{
return handleLargeFileSystem(fileStore.getTotalSpace());
}
@Override
public long getUsableSpace() throws IOException
{
return handleLargeFileSystem(fileStore.getUsableSpace());
}
@Override
public long getUnallocatedSpace() throws IOException
{
return handleLargeFileSystem(fileStore.getUnallocatedSpace());
}
@Override
public boolean supportsFileAttributeView(Class<? extends FileAttributeView> type)
{
return fileStore.supportsFileAttributeView(type);
}
@Override
public boolean supportsFileAttributeView(String name)
{
return fileStore.supportsFileAttributeView(name);
}
@Override
public <V extends FileStoreAttributeView> V getFileStoreAttributeView(Class<V> type)
{
return fileStore.getFileStoreAttributeView(type);
}
@Override
public Object getAttribute(String attribute) throws IOException
{
return fileStore.getAttribute(attribute);
}
}
}

View File

@ -0,0 +1,96 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao;
import com.github.dockerjava.api.command.InspectContainerResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.junit.ClassRule;
import org.junit.rules.ExternalResource;
import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.containers.delegate.CassandraDatabaseDelegate;
import org.testcontainers.delegate.DatabaseDelegate;
import org.testcontainers.ext.ScriptUtils;
import javax.script.ScriptException;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.List;
@Slf4j
public abstract class AbstractNoSqlContainer {
public static final List<String> INIT_SCRIPTS = List.of(
"cassandra/schema-keyspace.cql",
"cassandra/schema-ts.cql",
"cassandra/schema-ts-latest.cql"
);
@ClassRule(order = 0)
public static final CassandraContainer cassandra = (CassandraContainer) new CassandraContainer("cassandra:4.1") {
@Override
protected void containerIsStarted(InspectContainerResponse containerInfo) {
super.containerIsStarted(containerInfo);
DatabaseDelegate db = new CassandraDatabaseDelegate(this);
INIT_SCRIPTS.forEach(script -> runInitScriptIfRequired(db, script));
}
private void runInitScriptIfRequired(DatabaseDelegate db, String initScriptPath) {
logger().info("Init script [{}]", initScriptPath);
if (initScriptPath != null) {
try {
URL resource = Thread.currentThread().getContextClassLoader().getResource(initScriptPath);
if (resource == null) {
logger().warn("Could not load classpath init script: {}", initScriptPath);
throw new ScriptUtils.ScriptLoadException("Could not load classpath init script: " + initScriptPath + ". Resource not found.");
}
String cql = IOUtils.toString(resource, StandardCharsets.UTF_8);
ScriptUtils.executeDatabaseScript(db, initScriptPath, cql);
} catch (IOException e) {
logger().warn("Could not load classpath init script: {}", initScriptPath);
throw new ScriptUtils.ScriptLoadException("Could not load classpath init script: " + initScriptPath, e);
} catch (ScriptException e) {
logger().error("Error while executing init script: {}", initScriptPath, e);
throw new ScriptUtils.UncategorizedScriptException("Error while executing init script: " + initScriptPath, e);
}
}
}
}
.withEnv("HEAP_NEWSIZE", "64M")
.withEnv("MAX_HEAP_SIZE", "512M")
.withEnv("CASSANDRA_CLUSTER_NAME", "ThingsBoard Cluster");
@ClassRule(order = 1)
public static ExternalResource resource = new ExternalResource() {
@Override
protected void before() throws Throwable {
cassandra.start();
String cassandraUrl = String.format("%s:%s", cassandra.getHost(), cassandra.getMappedPort(9042));
log.debug("Cassandra url [{}]", cassandraUrl);
System.setProperty("cassandra.url", cassandraUrl);
}
@Override
protected void after() {
cassandra.stop();
List.of("cassandra.url")
.forEach(System.getProperties()::remove);
}
};
}

View File

@ -1,88 +0,0 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao;
import com.datastax.oss.driver.api.core.CqlSession;
import org.cassandraunit.BaseCassandraUnit;
import org.cassandraunit.CQLDataLoader;
import org.cassandraunit.dataset.CQLDataSet;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import java.util.List;
public class CustomCassandraCQLUnit extends BaseCassandraUnit {
protected List<CQLDataSet> dataSets;
public CqlSession session;
public CustomCassandraCQLUnit(List<CQLDataSet> dataSets) {
this.dataSets = dataSets;
}
public CustomCassandraCQLUnit(List<CQLDataSet> dataSets, int readTimeoutMillis) {
this.dataSets = dataSets;
this.readTimeoutMillis = readTimeoutMillis;
}
public CustomCassandraCQLUnit(List<CQLDataSet> dataSets, String configurationFileName) {
this(dataSets);
this.configurationFileName = configurationFileName;
}
public CustomCassandraCQLUnit(List<CQLDataSet> dataSets, String configurationFileName, int readTimeoutMillis) {
this(dataSets);
this.configurationFileName = configurationFileName;
this.readTimeoutMillis = readTimeoutMillis;
}
public CustomCassandraCQLUnit(List<CQLDataSet> dataSets, String configurationFileName, long startUpTimeoutMillis) {
super(startUpTimeoutMillis);
this.dataSets = dataSets;
this.configurationFileName = configurationFileName;
}
public CustomCassandraCQLUnit(List<CQLDataSet> dataSets, String configurationFileName, long startUpTimeoutMillis, int readTimeoutMillis) {
super(startUpTimeoutMillis);
this.dataSets = dataSets;
this.configurationFileName = configurationFileName;
this.readTimeoutMillis = readTimeoutMillis;
}
@Override
protected void load() {
session = EmbeddedCassandraServerHelper.getSession();
CQLDataLoader dataLoader = new CQLDataLoader(session);
dataSets.forEach(dataLoader::load);
session = dataLoader.getSession();
System.setSecurityManager(null);
}
@Override
protected void after() {
super.after();
try (CqlSession s = session) {
session = null;
}
System.setSecurityManager(null);
}
// Getters for those who do not like to directly access fields
public CqlSession getSession() {
return session;
}
}

View File

@ -15,28 +15,14 @@
*/
package org.thingsboard.server.dao;
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
import org.junit.ClassRule;
import org.junit.extensions.cpsuite.ClasspathSuite;
import org.junit.extensions.cpsuite.ClasspathSuite.ClassnameFilters;
import org.junit.runner.RunWith;
import java.util.Arrays;
@RunWith(ClasspathSuite.class)
@ClassnameFilters({
"org.thingsboard.server.dao.service.*.nosql.*ServiceNoSqlTest",
})
public class NoSqlDaoServiceTestSuite {
@ClassRule
public static CustomCassandraCQLUnit cassandraUnit =
new CustomCassandraCQLUnit(
Arrays.asList(
new ClassPathCQLDataSet("cassandra/schema-keyspace.cql", false, false),
new ClassPathCQLDataSet("cassandra/schema-ts.cql", false, false),
new ClassPathCQLDataSet("cassandra/schema-ts-latest.cql", false, false)
),
"cassandra-test.yaml", 30000L);
public class NoSqlDaoServiceTestSuite extends AbstractNoSqlContainer {
}

View File

@ -2,7 +2,7 @@ cassandra.cluster_name=Thingsboard Cluster
cassandra.keyspace_name=thingsboard
cassandra.url=127.0.0.1:9142
#cassandra.url=127.0.0.1:9142 # will be injected by NoSqlContainer
cassandra.local_datacenter=datacenter1

View File

@ -8,9 +8,7 @@
</appender>
<logger name="org.thingsboard.server.dao" level="WARN"/>
<logger name="org.apache.cassandra" level="WARN"/>
<logger name="org.cassandraunit" level="WARN" />
<logger name="org.apache.cassandra" level="WARN" />
<logger name="org.testcontainers" level="INFO" />
<root level="WARN">
<appender-ref ref="console"/>

27
pom.xml
View File

@ -126,7 +126,6 @@
<snmp4j.version>2.8.5</snmp4j.version>
<!-- TEST SCOPE -->
<awaitility.version>4.1.0</awaitility.version>
<cassandra-unit.version>4.3.1.0</cassandra-unit.version>
<dbunit.version>2.7.2</dbunit.version>
<java-websocket.version>1.5.2</java-websocket.version>
<jupiter.version>5.8.2</jupiter.version> <!-- keep the same version as spring-boot-starter-test depend on jupiter-->
@ -1603,26 +1602,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>
<version>${cassandra-unit.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
@ -1746,6 +1725,12 @@
<artifactId>bcpkix-jdk15on</artifactId>
<version>${bouncycastle.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>cassandra</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>

View File

@ -150,22 +150,6 @@
<artifactId>mockserver-client-java</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>