commit
b09266a54c
@ -53,8 +53,6 @@ import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -80,6 +78,10 @@ import static org.thingsboard.server.transport.coap.efento.utils.CoapEfentoUtils
|
||||
import static org.thingsboard.server.transport.coap.efento.utils.CoapEfentoUtils.WATER_METER_ACC_MINOR_METADATA_FACTOR;
|
||||
import static org.thingsboard.server.transport.coap.efento.utils.CoapEfentoUtils.isBinarySensor;
|
||||
import static org.thingsboard.server.transport.coap.efento.utils.CoapEfentoUtils.isSensorError;
|
||||
import static org.thingsboard.server.transport.coap.efento.utils.PulseCounterType.ELEC_METER_ACC;
|
||||
import static org.thingsboard.server.transport.coap.efento.utils.PulseCounterType.PULSE_CNT_ACC;
|
||||
import static org.thingsboard.server.transport.coap.efento.utils.PulseCounterType.PULSE_CNT_ACC_WIDE;
|
||||
import static org.thingsboard.server.transport.coap.efento.utils.PulseCounterType.WATER_CNT_ACC;
|
||||
|
||||
@Slf4j
|
||||
public class CoapEfentoTransportResource extends AbstractCoapTransportResource {
|
||||
@ -300,12 +302,6 @@ public class CoapEfentoTransportResource extends AbstractCoapTransportResource {
|
||||
}
|
||||
}
|
||||
|
||||
valuesMap.values().forEach(jsonObject -> {
|
||||
for (PulseCounterType pulseCounterType : PulseCounterType.values()) {
|
||||
calculatePulseCounterTotalValue(jsonObject, pulseCounterType);
|
||||
}
|
||||
});
|
||||
|
||||
if (CollectionUtils.isEmpty(valuesMap)) {
|
||||
throw new IllegalStateException("[" + sessionId + "]: Failed to collect Efento measurements, reason, values map is empty!");
|
||||
}
|
||||
@ -338,7 +334,7 @@ public class CoapEfentoTransportResource extends AbstractCoapTransportResource {
|
||||
values.addProperty("pulse_cnt_" + channelNumber, (double) (startPoint + sampleOffset));
|
||||
break;
|
||||
case MEASUREMENT_TYPE_IAQ:
|
||||
addPulseCounterProperties(values, "iaq_", channelNumber, startPoint + sampleOffset, IAQ_METADATA_FACTOR);
|
||||
addPropertiesForMeasurementTypeWithMetadataFactor(values, "iaq_", channelNumber, startPoint + sampleOffset, IAQ_METADATA_FACTOR);
|
||||
break;
|
||||
case MEASUREMENT_TYPE_ELECTRICITY_METER:
|
||||
values.addProperty("watt_hour_" + channelNumber, (double) (startPoint + sampleOffset));
|
||||
@ -356,25 +352,25 @@ public class CoapEfentoTransportResource extends AbstractCoapTransportResource {
|
||||
values.addProperty("distance_mm_" + channelNumber, (double) (startPoint + sampleOffset));
|
||||
break;
|
||||
case MEASUREMENT_TYPE_WATER_METER_ACC_MINOR:
|
||||
addPulseCounterProperties(values, "water_cnt_acc_minor_", channelNumber, startPoint + sampleOffset, WATER_METER_ACC_MINOR_METADATA_FACTOR);
|
||||
calculateAccPulseCounterTotalValue(values, WATER_CNT_ACC , channelNumber, startPoint + sampleOffset, WATER_METER_ACC_MINOR_METADATA_FACTOR);
|
||||
break;
|
||||
case MEASUREMENT_TYPE_WATER_METER_ACC_MAJOR:
|
||||
addPulseCounterProperties(values, "water_cnt_acc_major_", channelNumber, startPoint + sampleOffset, WATER_METER_ACC_MAJOR_METADATA_FACTOR);
|
||||
addPropertiesForMeasurementTypeWithMetadataFactor(values, WATER_CNT_ACC.getPrefix(), channelNumber, startPoint + sampleOffset, WATER_METER_ACC_MAJOR_METADATA_FACTOR);
|
||||
break;
|
||||
case MEASUREMENT_TYPE_HUMIDITY_ACCURATE:
|
||||
values.addProperty("humidity_relative_" + channelNumber, (double) (startPoint + sampleOffset) / 10f);
|
||||
break;
|
||||
case MEASUREMENT_TYPE_STATIC_IAQ:
|
||||
addPulseCounterProperties(values, "static_iaq_", channelNumber, startPoint + sampleOffset, STATIC_IAQ_METADATA_FACTOR);
|
||||
addPropertiesForMeasurementTypeWithMetadataFactor(values, "static_iaq_", channelNumber, startPoint + sampleOffset, STATIC_IAQ_METADATA_FACTOR);
|
||||
break;
|
||||
case MEASUREMENT_TYPE_CO2_GAS:
|
||||
addPulseCounterProperties(values, "co2_gas_", channelNumber, startPoint + sampleOffset, CO2_GAS_METADATA_FACTOR);
|
||||
addPropertiesForMeasurementTypeWithMetadataFactor(values, "co2_gas_", channelNumber, startPoint + sampleOffset, CO2_GAS_METADATA_FACTOR);
|
||||
break;
|
||||
case MEASUREMENT_TYPE_CO2_EQUIVALENT:
|
||||
addPulseCounterProperties(values, "co2_", channelNumber, startPoint + sampleOffset, CO2_EQUIVALENT_METADATA_FACTOR);
|
||||
addPropertiesForMeasurementTypeWithMetadataFactor(values, "co2_", channelNumber, startPoint + sampleOffset, CO2_EQUIVALENT_METADATA_FACTOR);
|
||||
break;
|
||||
case MEASUREMENT_TYPE_BREATH_VOC:
|
||||
addPulseCounterProperties(values, "breath_voc_", channelNumber, startPoint + sampleOffset, BREATH_VOC_METADATA_FACTOR);
|
||||
addPropertiesForMeasurementTypeWithMetadataFactor(values, "breath_voc_", channelNumber, startPoint + sampleOffset, BREATH_VOC_METADATA_FACTOR);
|
||||
break;
|
||||
case MEASUREMENT_TYPE_PERCENTAGE:
|
||||
values.addProperty("percentage_" + channelNumber, (double) (startPoint + sampleOffset) / 100f);
|
||||
@ -386,22 +382,22 @@ public class CoapEfentoTransportResource extends AbstractCoapTransportResource {
|
||||
values.addProperty("current_" + channelNumber, (double) (startPoint + sampleOffset) / 100f);
|
||||
break;
|
||||
case MEASUREMENT_TYPE_PULSE_CNT_ACC_MINOR:
|
||||
addPulseCounterProperties(values, "pulse_cnt_acc_minor_", channelNumber, startPoint + sampleOffset, PULSE_CNT_ACC_MINOR_METADATA_FACTOR);
|
||||
calculateAccPulseCounterTotalValue(values, PULSE_CNT_ACC , channelNumber, startPoint + sampleOffset, PULSE_CNT_ACC_MINOR_METADATA_FACTOR);
|
||||
break;
|
||||
case MEASUREMENT_TYPE_PULSE_CNT_ACC_MAJOR:
|
||||
addPulseCounterProperties(values, "pulse_cnt_acc_major_", channelNumber, startPoint + sampleOffset, PULSE_CNT_ACC_MAJOR_METADATA_FACTOR);
|
||||
addPropertiesForMeasurementTypeWithMetadataFactor(values, PULSE_CNT_ACC.getPrefix(), channelNumber, startPoint + sampleOffset, PULSE_CNT_ACC_MAJOR_METADATA_FACTOR);
|
||||
break;
|
||||
case MEASUREMENT_TYPE_ELEC_METER_ACC_MINOR:
|
||||
addPulseCounterProperties(values, "elec_meter_acc_minor_", channelNumber, startPoint + sampleOffset, ELEC_METER_ACC_MINOR_METADATA_FACTOR);
|
||||
calculateAccPulseCounterTotalValue(values, ELEC_METER_ACC , channelNumber, startPoint + sampleOffset, ELEC_METER_ACC_MINOR_METADATA_FACTOR);
|
||||
break;
|
||||
case MEASUREMENT_TYPE_ELEC_METER_ACC_MAJOR:
|
||||
addPulseCounterProperties(values, "elec_meter_acc_major_", channelNumber, startPoint + sampleOffset, ELEC_METER_ACC_MAJOR_METADATA_FACTOR);
|
||||
addPropertiesForMeasurementTypeWithMetadataFactor(values, ELEC_METER_ACC.getPrefix(), channelNumber, startPoint + sampleOffset, ELEC_METER_ACC_MAJOR_METADATA_FACTOR);
|
||||
break;
|
||||
case MEASUREMENT_TYPE_PULSE_CNT_ACC_WIDE_MINOR:
|
||||
addPulseCounterProperties(values, "pulse_cnt_acc_wide_minor_", channelNumber, startPoint + sampleOffset, PULSE_CNT_ACC_WIDE_MINOR_METADATA_FACTOR);
|
||||
calculateAccPulseCounterTotalValue(values, PULSE_CNT_ACC_WIDE , channelNumber, startPoint + sampleOffset, PULSE_CNT_ACC_WIDE_MINOR_METADATA_FACTOR);
|
||||
break;
|
||||
case MEASUREMENT_TYPE_PULSE_CNT_ACC_WIDE_MAJOR:
|
||||
addPulseCounterProperties(values, "pulse_cnt_acc_wide_major_", channelNumber, startPoint + sampleOffset, PULSE_CNT_ACC_WIDE_MAJOR_METADATA_FACTOR);
|
||||
addPropertiesForMeasurementTypeWithMetadataFactor(values, PULSE_CNT_ACC_WIDE.getPrefix(), channelNumber, startPoint + sampleOffset, PULSE_CNT_ACC_WIDE_MAJOR_METADATA_FACTOR);
|
||||
break;
|
||||
case MEASUREMENT_TYPE_CURRENT_PRECISE:
|
||||
values.addProperty("current_precise_" + channelNumber, (double) (startPoint + sampleOffset) / 1000f);
|
||||
@ -416,17 +412,20 @@ public class CoapEfentoTransportResource extends AbstractCoapTransportResource {
|
||||
}
|
||||
}
|
||||
|
||||
private void addPulseCounterProperties(JsonObject values, String prefix, int channelNumber, int value, int metadataFactor) {
|
||||
private void addPropertiesForMeasurementTypeWithMetadataFactor(JsonObject values, String prefix, int channelNumber, int value, int metadataFactor) {
|
||||
values.addProperty(prefix + channelNumber, value / metadataFactor);
|
||||
values.addProperty(prefix + "metadata_" + channelNumber, value % metadataFactor);
|
||||
}
|
||||
|
||||
private void calculatePulseCounterTotalValue(JsonObject value, PulseCounterType pulseCounterType) {
|
||||
Set<String> keys = value.keySet();
|
||||
Optional<String> major = keys.stream().filter(s -> s.startsWith(pulseCounterType.getPrefix() + "major_")).findAny();
|
||||
Optional<String> minor = keys.stream().filter(s -> s.startsWith(pulseCounterType.getPrefix() + "minor_")).findAny();
|
||||
if (major.isPresent() && minor.isPresent()) {
|
||||
value.addProperty(pulseCounterType.getPrefix() + "total_value", value.get(major.get()).getAsInt() * pulseCounterType.getMajorResolution() + value.get(minor.get()).getAsInt());
|
||||
private void calculateAccPulseCounterTotalValue(JsonObject values, PulseCounterType pulseCounterType, int channelNumber, int value, int metadataFactor) {
|
||||
int minorValue = value / metadataFactor;
|
||||
int majorChannel = value % metadataFactor + 1;
|
||||
String majorPropertyKey = pulseCounterType.getPrefix() + majorChannel;
|
||||
JsonElement majorProperty = values.get(majorPropertyKey);
|
||||
if (majorProperty != null) {
|
||||
int totalValue = majorProperty.getAsInt() * pulseCounterType.getMajorResolution() + minorValue;
|
||||
values.addProperty(pulseCounterType.getPrefix() + "total_" + channelNumber, totalValue);
|
||||
values.remove(majorPropertyKey);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -29,6 +29,7 @@ import org.thingsboard.server.transport.coap.efento.utils.CoapEfentoUtils;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Instant;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -172,7 +173,7 @@ class CoapEfentoTransportResourceTest {
|
||||
@ParameterizedTest
|
||||
@MethodSource
|
||||
void checkPulseCounterSensors(MeasurementType minorType, List<Integer> minorSampleOffsets, MeasurementType majorType, List<Integer> majorSampleOffsets,
|
||||
String propertyPrefix, double expectedValue) {
|
||||
String totalPropertyName, double expectedTotalValue) {
|
||||
long tsInSec = Instant.now().getEpochSecond();
|
||||
ProtoMeasurements measurements = ProtoMeasurements.newBuilder()
|
||||
.setSerialNum(integerToByteString(1234))
|
||||
@ -184,34 +185,34 @@ class CoapEfentoTransportResourceTest {
|
||||
.setNextTransmissionAt(1000)
|
||||
.setTransferReason(0)
|
||||
.setHash(0)
|
||||
.addAllChannels(List.of(MeasurementsProtos.ProtoChannel.newBuilder()
|
||||
.setType(minorType)
|
||||
.setTimestamp(Math.toIntExact(tsInSec))
|
||||
.addAllSampleOffsets(minorSampleOffsets)
|
||||
.build(),
|
||||
MeasurementsProtos.ProtoChannel.newBuilder()
|
||||
.addAllChannels(Arrays.asList(MeasurementsProtos.ProtoChannel.newBuilder()
|
||||
.setType(majorType)
|
||||
.setTimestamp(Math.toIntExact(tsInSec))
|
||||
.addAllSampleOffsets(majorSampleOffsets)
|
||||
.build(),
|
||||
MeasurementsProtos.ProtoChannel.newBuilder()
|
||||
.setType(minorType)
|
||||
.setTimestamp(Math.toIntExact(tsInSec))
|
||||
.addAllSampleOffsets(minorSampleOffsets)
|
||||
.build()))
|
||||
.build();
|
||||
List<CoapEfentoTransportResource.EfentoTelemetry> efentoMeasurements = coapEfentoTransportResource.getEfentoMeasurements(measurements, UUID.randomUUID());
|
||||
assertThat(efentoMeasurements).hasSize(1);
|
||||
assertThat(efentoMeasurements.get(0).getTs()).isEqualTo(tsInSec * 1000);
|
||||
assertThat(efentoMeasurements.get(0).getValues().getAsJsonObject().get(propertyPrefix + "_total_value").getAsDouble()).isEqualTo(expectedValue);
|
||||
assertThat(efentoMeasurements.get(0).getValues().getAsJsonObject().get(totalPropertyName + "_2").getAsDouble()).isEqualTo(expectedTotalValue);
|
||||
checkDefaultMeasurements(measurements, efentoMeasurements, 180, false);
|
||||
}
|
||||
|
||||
private static Stream<Arguments> checkPulseCounterSensors() {
|
||||
return Stream.of(
|
||||
Arguments.of(MEASUREMENT_TYPE_WATER_METER_ACC_MINOR, List.of(125), MEASUREMENT_TYPE_WATER_METER_ACC_MAJOR,
|
||||
List.of(2500), "water_cnt_acc", 62520.0),
|
||||
Arguments.of(MEASUREMENT_TYPE_PULSE_CNT_ACC_MINOR, List.of(180), MEASUREMENT_TYPE_PULSE_CNT_ACC_MAJOR,
|
||||
List.of(1200), "pulse_cnt_acc", 300030.0),
|
||||
Arguments.of(MEASUREMENT_TYPE_ELEC_METER_ACC_MINOR, List.of(550), MEASUREMENT_TYPE_ELEC_METER_ACC_MAJOR,
|
||||
List.of(5500), "elec_meter_acc", 1375091.0),
|
||||
Arguments.of(MEASUREMENT_TYPE_PULSE_CNT_ACC_WIDE_MINOR, List.of(230), MEASUREMENT_TYPE_PULSE_CNT_ACC_WIDE_MAJOR,
|
||||
List.of(1700), "pulse_cnt_acc_wide", 425000038.0));
|
||||
Arguments.of(MEASUREMENT_TYPE_WATER_METER_ACC_MINOR, List.of(15*6), MEASUREMENT_TYPE_WATER_METER_ACC_MAJOR,
|
||||
List.of(625*4), "water_cnt_acc_total", 625.0*100 + 15),
|
||||
Arguments.of(MEASUREMENT_TYPE_PULSE_CNT_ACC_MINOR, List.of(10*6), MEASUREMENT_TYPE_PULSE_CNT_ACC_MAJOR,
|
||||
List.of(300*4), "pulse_cnt_acc_total", 300.0*1000 + 10),
|
||||
Arguments.of(MEASUREMENT_TYPE_ELEC_METER_ACC_MINOR, List.of(12*6), MEASUREMENT_TYPE_ELEC_METER_ACC_MAJOR,
|
||||
List.of(100*4), "elec_meter_acc_total", 100.0*1000 + 12),
|
||||
Arguments.of(MEASUREMENT_TYPE_PULSE_CNT_ACC_WIDE_MINOR, List.of(13*6), MEASUREMENT_TYPE_PULSE_CNT_ACC_WIDE_MAJOR,
|
||||
List.of(440*4), "pulse_cnt_acc_wide_total", 440.0*1000000 + 13));
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -203,7 +203,7 @@ public class DefaultGitRepositoryService implements GitRepositoryService {
|
||||
GitRepository gitRepository = Optional.ofNullable(repositories.get(tenantId))
|
||||
.orElseThrow(() -> new IllegalStateException("Repository is not initialized"));
|
||||
|
||||
if (!Files.exists(Path.of(gitRepository.getDirectory()))) {
|
||||
if (!GitRepository.exists(gitRepository.getDirectory())) {
|
||||
try {
|
||||
return openOrCloneRepository(tenantId, gitRepository.getSettings(), false);
|
||||
} catch (Exception e) {
|
||||
@ -285,12 +285,13 @@ public class DefaultGitRepositoryService implements GitRepositoryService {
|
||||
Path repositoryDirectory = Path.of(repositoriesFolder, settings.isLocalOnly() ? "local_" + settings.getRepositoryUri() : tenantId.getId().toString());
|
||||
|
||||
GitRepository repository;
|
||||
if (Files.exists(repositoryDirectory)) {
|
||||
if (GitRepository.exists(repositoryDirectory.toString())) {
|
||||
repository = GitRepository.open(repositoryDirectory.toFile(), settings);
|
||||
if (fetch) {
|
||||
repository.fetch();
|
||||
}
|
||||
} else {
|
||||
FileUtils.deleteDirectory(repositoryDirectory.toFile());
|
||||
Files.createDirectories(repositoryDirectory);
|
||||
if (settings.isLocalOnly()) {
|
||||
repository = GitRepository.create(settings, repositoryDirectory.toFile());
|
||||
|
||||
@ -21,6 +21,7 @@ import com.google.common.collect.Streams;
|
||||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@ -75,6 +76,7 @@ import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.security.KeyPair;
|
||||
import java.security.PublicKey;
|
||||
import java.util.ArrayList;
|
||||
@ -409,6 +411,12 @@ public class GitRepository {
|
||||
return result;
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
public static boolean exists(String directory) {
|
||||
File gitDirectory = Path.of(directory, ".git").toFile();
|
||||
return FileUtils.isDirectory(gitDirectory) && !FileUtils.isEmptyDirectory(gitDirectory);
|
||||
}
|
||||
|
||||
private <C extends GitCommand<T>, T> T execute(C command) throws GitAPIException {
|
||||
if (command instanceof TransportCommand transportCommand && authHandler != null) {
|
||||
authHandler.configureCommand(transportCommand);
|
||||
|
||||
@ -47,8 +47,7 @@ public abstract class AbstractVersionedInsertRepository<T> extends AbstractInser
|
||||
|
||||
List<Integer> toInsertIndexes = new ArrayList<>(notUpdatedCount);
|
||||
List<T> insertEntities = new ArrayList<>(notUpdatedCount);
|
||||
int keyHolderIndex = 0;
|
||||
for (int i = 0; i < updateResult.length; i++) {
|
||||
for (int i = 0, keyHolderIndex = 0; i < updateResult.length; i++) {
|
||||
if (updateResult[i] == 0) {
|
||||
insertEntities.add(entities.get(i));
|
||||
seqNumbers.add(null);
|
||||
@ -67,9 +66,10 @@ public abstract class AbstractVersionedInsertRepository<T> extends AbstractInser
|
||||
|
||||
seqNumbersList = keyHolder.getKeyList();
|
||||
|
||||
for (int i = 0; i < insertResult.length; i++) {
|
||||
for (int i = 0, keyHolderIndex = 0; i < insertResult.length; i++) {
|
||||
if (insertResult[i] != 0) {
|
||||
seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(i).get(VERSION_COLUMN));
|
||||
seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(keyHolderIndex).get(VERSION_COLUMN));
|
||||
keyHolderIndex++;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,115 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.sqlts;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.StringDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.dao.service.AbstractServiceTest;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@DaoSqlTest
|
||||
public class SqlTimeseriesLatestDaoTest extends AbstractServiceTest {
|
||||
|
||||
@Autowired
|
||||
private TimeseriesLatestDao timeseriesLatestDao;
|
||||
|
||||
@Test
|
||||
public void saveLatestTest() throws Exception {
|
||||
DeviceId deviceId = new DeviceId(UUID.randomUUID());
|
||||
var entry = createEntry("key", 1000);
|
||||
Long version = timeseriesLatestDao.saveLatest(tenantId, deviceId, entry).get();
|
||||
assertNotNull(version);
|
||||
assertTrue(version > 0);
|
||||
|
||||
TsKvEntry foundEntry = timeseriesLatestDao.findLatest(tenantId, deviceId, "key").get();
|
||||
assertNotNull(foundEntry);
|
||||
equalsIgnoreVersion(entry, foundEntry);
|
||||
assertEquals(version, foundEntry.getVersion());
|
||||
|
||||
var updatedEntry = createEntry("key", 2000);
|
||||
Long updatedVersion = timeseriesLatestDao.saveLatest(tenantId, deviceId, updatedEntry).get();
|
||||
assertNotNull(updatedVersion);
|
||||
assertTrue(updatedVersion > version);
|
||||
|
||||
foundEntry = timeseriesLatestDao.findLatest(tenantId, deviceId, "key").get();
|
||||
assertNotNull(foundEntry);
|
||||
equalsIgnoreVersion(updatedEntry, foundEntry);
|
||||
assertEquals(updatedVersion, foundEntry.getVersion());
|
||||
|
||||
var oldEntry = createEntry("key", 1);
|
||||
Long oldVersion = timeseriesLatestDao.saveLatest(tenantId, deviceId, oldEntry).get();
|
||||
assertNull(oldVersion);
|
||||
|
||||
foundEntry = timeseriesLatestDao.findLatest(tenantId, deviceId, "key").get();
|
||||
assertNotNull(foundEntry);
|
||||
equalsIgnoreVersion(updatedEntry, foundEntry);
|
||||
assertEquals(updatedVersion, foundEntry.getVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void updateWithOldTsTest() throws Exception {
|
||||
DeviceId deviceId = new DeviceId(UUID.randomUUID());
|
||||
int n = 50;
|
||||
for (int i = 0; i < n; i++) {
|
||||
timeseriesLatestDao.saveLatest(tenantId, deviceId, createEntry("key_" + i, System.currentTimeMillis()));
|
||||
}
|
||||
|
||||
List<ListenableFuture<Long>> futures = new ArrayList<>(n);
|
||||
|
||||
for (int i = 0; i < n; i++) {
|
||||
long ts = i % 2 == 0 ? System.currentTimeMillis() : 1000;
|
||||
futures.add(timeseriesLatestDao.saveLatest(tenantId, deviceId, createEntry("key_" + i, ts)));
|
||||
}
|
||||
|
||||
for (int i = 0; i < futures.size(); i++) {
|
||||
Long version = futures.get(i).get();
|
||||
if (i % 2 == 0) {
|
||||
assertNotNull(version);
|
||||
assertTrue(version > 0);
|
||||
} else {
|
||||
assertNull(version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private TsKvEntry createEntry(String key, long ts) {
|
||||
return new BasicTsKvEntry(ts, new StringDataEntry(key, RandomStringUtils.random(10)));
|
||||
}
|
||||
|
||||
private void equalsIgnoreVersion(TsKvEntry expected, TsKvEntry actual) {
|
||||
Assert.assertEquals(expected.getKey(), actual.getKey());
|
||||
Assert.assertEquals(expected.getValue(), actual.getValue());
|
||||
Assert.assertEquals(expected.getTs(), actual.getTs());
|
||||
}
|
||||
|
||||
}
|
||||
2
pom.xml
2
pom.xml
@ -83,7 +83,7 @@
|
||||
<zookeeper.version>3.9.2</zookeeper.version>
|
||||
<protobuf.version>3.25.3</protobuf.version> <!-- A Major v4 does not support by the pubsub yet-->
|
||||
<grpc.version>1.63.0</grpc.version>
|
||||
<tbel.version>1.2.3</tbel.version>
|
||||
<tbel.version>1.2.4</tbel.version>
|
||||
<lombok.version>1.18.32</lombok.version>
|
||||
<paho.client.version>1.2.5</paho.client.version>
|
||||
<paho.mqttv5.client.version>1.2.5</paho.mqttv5.client.version>
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user