diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResource.java index 9b8da62f00..1755335118 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResource.java @@ -53,8 +53,6 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -80,6 +78,10 @@ import static org.thingsboard.server.transport.coap.efento.utils.CoapEfentoUtils import static org.thingsboard.server.transport.coap.efento.utils.CoapEfentoUtils.WATER_METER_ACC_MINOR_METADATA_FACTOR; import static org.thingsboard.server.transport.coap.efento.utils.CoapEfentoUtils.isBinarySensor; import static org.thingsboard.server.transport.coap.efento.utils.CoapEfentoUtils.isSensorError; +import static org.thingsboard.server.transport.coap.efento.utils.PulseCounterType.ELEC_METER_ACC; +import static org.thingsboard.server.transport.coap.efento.utils.PulseCounterType.PULSE_CNT_ACC; +import static org.thingsboard.server.transport.coap.efento.utils.PulseCounterType.PULSE_CNT_ACC_WIDE; +import static org.thingsboard.server.transport.coap.efento.utils.PulseCounterType.WATER_CNT_ACC; @Slf4j public class CoapEfentoTransportResource extends AbstractCoapTransportResource { @@ -300,12 +302,6 @@ public class CoapEfentoTransportResource extends AbstractCoapTransportResource { } } - valuesMap.values().forEach(jsonObject -> { - for (PulseCounterType pulseCounterType : PulseCounterType.values()) { - calculatePulseCounterTotalValue(jsonObject, pulseCounterType); - } - }); - if (CollectionUtils.isEmpty(valuesMap)) { throw new IllegalStateException("[" + sessionId + "]: Failed to collect Efento measurements, reason, values map is empty!"); } @@ -338,7 +334,7 @@ public class CoapEfentoTransportResource extends AbstractCoapTransportResource { values.addProperty("pulse_cnt_" + channelNumber, (double) (startPoint + sampleOffset)); break; case MEASUREMENT_TYPE_IAQ: - addPulseCounterProperties(values, "iaq_", channelNumber, startPoint + sampleOffset, IAQ_METADATA_FACTOR); + addPropertiesForMeasurementTypeWithMetadataFactor(values, "iaq_", channelNumber, startPoint + sampleOffset, IAQ_METADATA_FACTOR); break; case MEASUREMENT_TYPE_ELECTRICITY_METER: values.addProperty("watt_hour_" + channelNumber, (double) (startPoint + sampleOffset)); @@ -356,25 +352,25 @@ public class CoapEfentoTransportResource extends AbstractCoapTransportResource { values.addProperty("distance_mm_" + channelNumber, (double) (startPoint + sampleOffset)); break; case MEASUREMENT_TYPE_WATER_METER_ACC_MINOR: - addPulseCounterProperties(values, "water_cnt_acc_minor_", channelNumber, startPoint + sampleOffset, WATER_METER_ACC_MINOR_METADATA_FACTOR); + calculateAccPulseCounterTotalValue(values, WATER_CNT_ACC , channelNumber, startPoint + sampleOffset, WATER_METER_ACC_MINOR_METADATA_FACTOR); break; case MEASUREMENT_TYPE_WATER_METER_ACC_MAJOR: - addPulseCounterProperties(values, "water_cnt_acc_major_", channelNumber, startPoint + sampleOffset, WATER_METER_ACC_MAJOR_METADATA_FACTOR); + addPropertiesForMeasurementTypeWithMetadataFactor(values, WATER_CNT_ACC.getPrefix(), channelNumber, startPoint + sampleOffset, WATER_METER_ACC_MAJOR_METADATA_FACTOR); break; case MEASUREMENT_TYPE_HUMIDITY_ACCURATE: values.addProperty("humidity_relative_" + channelNumber, (double) (startPoint + sampleOffset) / 10f); break; case MEASUREMENT_TYPE_STATIC_IAQ: - addPulseCounterProperties(values, "static_iaq_", channelNumber, startPoint + sampleOffset, STATIC_IAQ_METADATA_FACTOR); + addPropertiesForMeasurementTypeWithMetadataFactor(values, "static_iaq_", channelNumber, startPoint + sampleOffset, STATIC_IAQ_METADATA_FACTOR); break; case MEASUREMENT_TYPE_CO2_GAS: - addPulseCounterProperties(values, "co2_gas_", channelNumber, startPoint + sampleOffset, CO2_GAS_METADATA_FACTOR); + addPropertiesForMeasurementTypeWithMetadataFactor(values, "co2_gas_", channelNumber, startPoint + sampleOffset, CO2_GAS_METADATA_FACTOR); break; case MEASUREMENT_TYPE_CO2_EQUIVALENT: - addPulseCounterProperties(values, "co2_", channelNumber, startPoint + sampleOffset, CO2_EQUIVALENT_METADATA_FACTOR); + addPropertiesForMeasurementTypeWithMetadataFactor(values, "co2_", channelNumber, startPoint + sampleOffset, CO2_EQUIVALENT_METADATA_FACTOR); break; case MEASUREMENT_TYPE_BREATH_VOC: - addPulseCounterProperties(values, "breath_voc_", channelNumber, startPoint + sampleOffset, BREATH_VOC_METADATA_FACTOR); + addPropertiesForMeasurementTypeWithMetadataFactor(values, "breath_voc_", channelNumber, startPoint + sampleOffset, BREATH_VOC_METADATA_FACTOR); break; case MEASUREMENT_TYPE_PERCENTAGE: values.addProperty("percentage_" + channelNumber, (double) (startPoint + sampleOffset) / 100f); @@ -386,22 +382,22 @@ public class CoapEfentoTransportResource extends AbstractCoapTransportResource { values.addProperty("current_" + channelNumber, (double) (startPoint + sampleOffset) / 100f); break; case MEASUREMENT_TYPE_PULSE_CNT_ACC_MINOR: - addPulseCounterProperties(values, "pulse_cnt_acc_minor_", channelNumber, startPoint + sampleOffset, PULSE_CNT_ACC_MINOR_METADATA_FACTOR); + calculateAccPulseCounterTotalValue(values, PULSE_CNT_ACC , channelNumber, startPoint + sampleOffset, PULSE_CNT_ACC_MINOR_METADATA_FACTOR); break; case MEASUREMENT_TYPE_PULSE_CNT_ACC_MAJOR: - addPulseCounterProperties(values, "pulse_cnt_acc_major_", channelNumber, startPoint + sampleOffset, PULSE_CNT_ACC_MAJOR_METADATA_FACTOR); + addPropertiesForMeasurementTypeWithMetadataFactor(values, PULSE_CNT_ACC.getPrefix(), channelNumber, startPoint + sampleOffset, PULSE_CNT_ACC_MAJOR_METADATA_FACTOR); break; case MEASUREMENT_TYPE_ELEC_METER_ACC_MINOR: - addPulseCounterProperties(values, "elec_meter_acc_minor_", channelNumber, startPoint + sampleOffset, ELEC_METER_ACC_MINOR_METADATA_FACTOR); + calculateAccPulseCounterTotalValue(values, ELEC_METER_ACC , channelNumber, startPoint + sampleOffset, ELEC_METER_ACC_MINOR_METADATA_FACTOR); break; case MEASUREMENT_TYPE_ELEC_METER_ACC_MAJOR: - addPulseCounterProperties(values, "elec_meter_acc_major_", channelNumber, startPoint + sampleOffset, ELEC_METER_ACC_MAJOR_METADATA_FACTOR); + addPropertiesForMeasurementTypeWithMetadataFactor(values, ELEC_METER_ACC.getPrefix(), channelNumber, startPoint + sampleOffset, ELEC_METER_ACC_MAJOR_METADATA_FACTOR); break; case MEASUREMENT_TYPE_PULSE_CNT_ACC_WIDE_MINOR: - addPulseCounterProperties(values, "pulse_cnt_acc_wide_minor_", channelNumber, startPoint + sampleOffset, PULSE_CNT_ACC_WIDE_MINOR_METADATA_FACTOR); + calculateAccPulseCounterTotalValue(values, PULSE_CNT_ACC_WIDE , channelNumber, startPoint + sampleOffset, PULSE_CNT_ACC_WIDE_MINOR_METADATA_FACTOR); break; case MEASUREMENT_TYPE_PULSE_CNT_ACC_WIDE_MAJOR: - addPulseCounterProperties(values, "pulse_cnt_acc_wide_major_", channelNumber, startPoint + sampleOffset, PULSE_CNT_ACC_WIDE_MAJOR_METADATA_FACTOR); + addPropertiesForMeasurementTypeWithMetadataFactor(values, PULSE_CNT_ACC_WIDE.getPrefix(), channelNumber, startPoint + sampleOffset, PULSE_CNT_ACC_WIDE_MAJOR_METADATA_FACTOR); break; case MEASUREMENT_TYPE_CURRENT_PRECISE: values.addProperty("current_precise_" + channelNumber, (double) (startPoint + sampleOffset) / 1000f); @@ -416,17 +412,20 @@ public class CoapEfentoTransportResource extends AbstractCoapTransportResource { } } - private void addPulseCounterProperties(JsonObject values, String prefix, int channelNumber, int value, int metadataFactor) { + private void addPropertiesForMeasurementTypeWithMetadataFactor(JsonObject values, String prefix, int channelNumber, int value, int metadataFactor) { values.addProperty(prefix + channelNumber, value / metadataFactor); values.addProperty(prefix + "metadata_" + channelNumber, value % metadataFactor); } - private void calculatePulseCounterTotalValue(JsonObject value, PulseCounterType pulseCounterType) { - Set keys = value.keySet(); - Optional major = keys.stream().filter(s -> s.startsWith(pulseCounterType.getPrefix() + "major_")).findAny(); - Optional minor = keys.stream().filter(s -> s.startsWith(pulseCounterType.getPrefix() + "minor_")).findAny(); - if (major.isPresent() && minor.isPresent()) { - value.addProperty(pulseCounterType.getPrefix() + "total_value", value.get(major.get()).getAsInt() * pulseCounterType.getMajorResolution() + value.get(minor.get()).getAsInt()); + private void calculateAccPulseCounterTotalValue(JsonObject values, PulseCounterType pulseCounterType, int channelNumber, int value, int metadataFactor) { + int minorValue = value / metadataFactor; + int majorChannel = value % metadataFactor + 1; + String majorPropertyKey = pulseCounterType.getPrefix() + majorChannel; + JsonElement majorProperty = values.get(majorPropertyKey); + if (majorProperty != null) { + int totalValue = majorProperty.getAsInt() * pulseCounterType.getMajorResolution() + minorValue; + values.addProperty(pulseCounterType.getPrefix() + "total_" + channelNumber, totalValue); + values.remove(majorPropertyKey); } } diff --git a/common/transport/coap/src/test/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResourceTest.java b/common/transport/coap/src/test/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResourceTest.java index d61712cdf6..602e7fffc7 100644 --- a/common/transport/coap/src/test/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResourceTest.java +++ b/common/transport/coap/src/test/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResourceTest.java @@ -29,6 +29,7 @@ import org.thingsboard.server.transport.coap.efento.utils.CoapEfentoUtils; import java.nio.ByteBuffer; import java.time.Instant; +import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -172,7 +173,7 @@ class CoapEfentoTransportResourceTest { @ParameterizedTest @MethodSource void checkPulseCounterSensors(MeasurementType minorType, List minorSampleOffsets, MeasurementType majorType, List majorSampleOffsets, - String propertyPrefix, double expectedValue) { + String totalPropertyName, double expectedTotalValue) { long tsInSec = Instant.now().getEpochSecond(); ProtoMeasurements measurements = ProtoMeasurements.newBuilder() .setSerialNum(integerToByteString(1234)) @@ -184,34 +185,34 @@ class CoapEfentoTransportResourceTest { .setNextTransmissionAt(1000) .setTransferReason(0) .setHash(0) - .addAllChannels(List.of(MeasurementsProtos.ProtoChannel.newBuilder() - .setType(minorType) - .setTimestamp(Math.toIntExact(tsInSec)) - .addAllSampleOffsets(minorSampleOffsets) - .build(), - MeasurementsProtos.ProtoChannel.newBuilder() + .addAllChannels(Arrays.asList(MeasurementsProtos.ProtoChannel.newBuilder() .setType(majorType) .setTimestamp(Math.toIntExact(tsInSec)) .addAllSampleOffsets(majorSampleOffsets) + .build(), + MeasurementsProtos.ProtoChannel.newBuilder() + .setType(minorType) + .setTimestamp(Math.toIntExact(tsInSec)) + .addAllSampleOffsets(minorSampleOffsets) .build())) .build(); List efentoMeasurements = coapEfentoTransportResource.getEfentoMeasurements(measurements, UUID.randomUUID()); assertThat(efentoMeasurements).hasSize(1); assertThat(efentoMeasurements.get(0).getTs()).isEqualTo(tsInSec * 1000); - assertThat(efentoMeasurements.get(0).getValues().getAsJsonObject().get(propertyPrefix + "_total_value").getAsDouble()).isEqualTo(expectedValue); + assertThat(efentoMeasurements.get(0).getValues().getAsJsonObject().get(totalPropertyName + "_2").getAsDouble()).isEqualTo(expectedTotalValue); checkDefaultMeasurements(measurements, efentoMeasurements, 180, false); } private static Stream checkPulseCounterSensors() { return Stream.of( - Arguments.of(MEASUREMENT_TYPE_WATER_METER_ACC_MINOR, List.of(125), MEASUREMENT_TYPE_WATER_METER_ACC_MAJOR, - List.of(2500), "water_cnt_acc", 62520.0), - Arguments.of(MEASUREMENT_TYPE_PULSE_CNT_ACC_MINOR, List.of(180), MEASUREMENT_TYPE_PULSE_CNT_ACC_MAJOR, - List.of(1200), "pulse_cnt_acc", 300030.0), - Arguments.of(MEASUREMENT_TYPE_ELEC_METER_ACC_MINOR, List.of(550), MEASUREMENT_TYPE_ELEC_METER_ACC_MAJOR, - List.of(5500), "elec_meter_acc", 1375091.0), - Arguments.of(MEASUREMENT_TYPE_PULSE_CNT_ACC_WIDE_MINOR, List.of(230), MEASUREMENT_TYPE_PULSE_CNT_ACC_WIDE_MAJOR, - List.of(1700), "pulse_cnt_acc_wide", 425000038.0)); + Arguments.of(MEASUREMENT_TYPE_WATER_METER_ACC_MINOR, List.of(15*6), MEASUREMENT_TYPE_WATER_METER_ACC_MAJOR, + List.of(625*4), "water_cnt_acc_total", 625.0*100 + 15), + Arguments.of(MEASUREMENT_TYPE_PULSE_CNT_ACC_MINOR, List.of(10*6), MEASUREMENT_TYPE_PULSE_CNT_ACC_MAJOR, + List.of(300*4), "pulse_cnt_acc_total", 300.0*1000 + 10), + Arguments.of(MEASUREMENT_TYPE_ELEC_METER_ACC_MINOR, List.of(12*6), MEASUREMENT_TYPE_ELEC_METER_ACC_MAJOR, + List.of(100*4), "elec_meter_acc_total", 100.0*1000 + 12), + Arguments.of(MEASUREMENT_TYPE_PULSE_CNT_ACC_WIDE_MINOR, List.of(13*6), MEASUREMENT_TYPE_PULSE_CNT_ACC_WIDE_MAJOR, + List.of(440*4), "pulse_cnt_acc_wide_total", 440.0*1000000 + 13)); } diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitRepositoryService.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitRepositoryService.java index 0788143030..2aa31e194d 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitRepositoryService.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitRepositoryService.java @@ -203,7 +203,7 @@ public class DefaultGitRepositoryService implements GitRepositoryService { GitRepository gitRepository = Optional.ofNullable(repositories.get(tenantId)) .orElseThrow(() -> new IllegalStateException("Repository is not initialized")); - if (!Files.exists(Path.of(gitRepository.getDirectory()))) { + if (!GitRepository.exists(gitRepository.getDirectory())) { try { return openOrCloneRepository(tenantId, gitRepository.getSettings(), false); } catch (Exception e) { @@ -285,12 +285,13 @@ public class DefaultGitRepositoryService implements GitRepositoryService { Path repositoryDirectory = Path.of(repositoriesFolder, settings.isLocalOnly() ? "local_" + settings.getRepositoryUri() : tenantId.getId().toString()); GitRepository repository; - if (Files.exists(repositoryDirectory)) { + if (GitRepository.exists(repositoryDirectory.toString())) { repository = GitRepository.open(repositoryDirectory.toFile(), settings); if (fetch) { repository.fetch(); } } else { + FileUtils.deleteDirectory(repositoryDirectory.toFile()); Files.createDirectories(repositoryDirectory); if (settings.isLocalOnly()) { repository = GitRepository.create(settings, repositoryDirectory.toFile()); diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepository.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepository.java index aaa4c22289..2ff6a97c21 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepository.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/GitRepository.java @@ -21,6 +21,7 @@ import com.google.common.collect.Streams; import lombok.Data; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; @@ -75,6 +76,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.security.KeyPair; import java.security.PublicKey; import java.util.ArrayList; @@ -409,6 +411,12 @@ public class GitRepository { return result; } + @SneakyThrows + public static boolean exists(String directory) { + File gitDirectory = Path.of(directory, ".git").toFile(); + return FileUtils.isDirectory(gitDirectory) && !FileUtils.isEmptyDirectory(gitDirectory); + } + private , T> T execute(C command) throws GitAPIException { if (command instanceof TransportCommand transportCommand && authHandler != null) { authHandler.configureCommand(transportCommand); diff --git a/dao/src/main/java/org/thingsboard/server/dao/AbstractVersionedInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/AbstractVersionedInsertRepository.java index 43ed41e52a..a042bc32b1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/AbstractVersionedInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/AbstractVersionedInsertRepository.java @@ -47,8 +47,7 @@ public abstract class AbstractVersionedInsertRepository extends AbstractInser List toInsertIndexes = new ArrayList<>(notUpdatedCount); List insertEntities = new ArrayList<>(notUpdatedCount); - int keyHolderIndex = 0; - for (int i = 0; i < updateResult.length; i++) { + for (int i = 0, keyHolderIndex = 0; i < updateResult.length; i++) { if (updateResult[i] == 0) { insertEntities.add(entities.get(i)); seqNumbers.add(null); @@ -67,9 +66,10 @@ public abstract class AbstractVersionedInsertRepository extends AbstractInser seqNumbersList = keyHolder.getKeyList(); - for (int i = 0; i < insertResult.length; i++) { + for (int i = 0, keyHolderIndex = 0; i < insertResult.length; i++) { if (insertResult[i] != 0) { - seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(i).get(VERSION_COLUMN)); + seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(keyHolderIndex).get(VERSION_COLUMN)); + keyHolderIndex++; } } diff --git a/dao/src/test/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDaoTest.java new file mode 100644 index 0000000000..bddd2f4f8c --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDaoTest.java @@ -0,0 +1,115 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sqlts; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.Assert; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.dao.service.AbstractServiceTest; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@DaoSqlTest +public class SqlTimeseriesLatestDaoTest extends AbstractServiceTest { + + @Autowired + private TimeseriesLatestDao timeseriesLatestDao; + + @Test + public void saveLatestTest() throws Exception { + DeviceId deviceId = new DeviceId(UUID.randomUUID()); + var entry = createEntry("key", 1000); + Long version = timeseriesLatestDao.saveLatest(tenantId, deviceId, entry).get(); + assertNotNull(version); + assertTrue(version > 0); + + TsKvEntry foundEntry = timeseriesLatestDao.findLatest(tenantId, deviceId, "key").get(); + assertNotNull(foundEntry); + equalsIgnoreVersion(entry, foundEntry); + assertEquals(version, foundEntry.getVersion()); + + var updatedEntry = createEntry("key", 2000); + Long updatedVersion = timeseriesLatestDao.saveLatest(tenantId, deviceId, updatedEntry).get(); + assertNotNull(updatedVersion); + assertTrue(updatedVersion > version); + + foundEntry = timeseriesLatestDao.findLatest(tenantId, deviceId, "key").get(); + assertNotNull(foundEntry); + equalsIgnoreVersion(updatedEntry, foundEntry); + assertEquals(updatedVersion, foundEntry.getVersion()); + + var oldEntry = createEntry("key", 1); + Long oldVersion = timeseriesLatestDao.saveLatest(tenantId, deviceId, oldEntry).get(); + assertNull(oldVersion); + + foundEntry = timeseriesLatestDao.findLatest(tenantId, deviceId, "key").get(); + assertNotNull(foundEntry); + equalsIgnoreVersion(updatedEntry, foundEntry); + assertEquals(updatedVersion, foundEntry.getVersion()); + } + + @Test + public void updateWithOldTsTest() throws Exception { + DeviceId deviceId = new DeviceId(UUID.randomUUID()); + int n = 50; + for (int i = 0; i < n; i++) { + timeseriesLatestDao.saveLatest(tenantId, deviceId, createEntry("key_" + i, System.currentTimeMillis())); + } + + List> futures = new ArrayList<>(n); + + for (int i = 0; i < n; i++) { + long ts = i % 2 == 0 ? System.currentTimeMillis() : 1000; + futures.add(timeseriesLatestDao.saveLatest(tenantId, deviceId, createEntry("key_" + i, ts))); + } + + for (int i = 0; i < futures.size(); i++) { + Long version = futures.get(i).get(); + if (i % 2 == 0) { + assertNotNull(version); + assertTrue(version > 0); + } else { + assertNull(version); + } + } + } + + private TsKvEntry createEntry(String key, long ts) { + return new BasicTsKvEntry(ts, new StringDataEntry(key, RandomStringUtils.random(10))); + } + + private void equalsIgnoreVersion(TsKvEntry expected, TsKvEntry actual) { + Assert.assertEquals(expected.getKey(), actual.getKey()); + Assert.assertEquals(expected.getValue(), actual.getValue()); + Assert.assertEquals(expected.getTs(), actual.getTs()); + } + +} diff --git a/pom.xml b/pom.xml index 5e4cc50c7c..e4c7fa09ed 100755 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,7 @@ 3.9.2 3.25.3 1.63.0 - 1.2.3 + 1.2.4 1.18.32 1.2.5 1.2.5