Merge pull request #5821 from van-vanich/fix_issue_with_save_last_ts

[3.3.3] Add ignoreMetadataTs to TbMsgTimeseriesNode
This commit is contained in:
Igor Kulikov 2022-01-12 19:07:33 +02:00 committed by GitHub
commit 5e3f1315f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 225 additions and 3 deletions

View File

@ -23,6 +23,7 @@ import org.thingsboard.server.queue.memory.InMemoryStorage;
@RunWith(ClasspathSuite.class) @RunWith(ClasspathSuite.class)
@ClasspathSuite.ClassnameFilters({ @ClasspathSuite.ClassnameFilters({
"org.thingsboard.server.service.resource.sql.*Test", "org.thingsboard.server.service.resource.sql.*Test",
"org.thingsboard.server.service.sql.*Test"
}) })
public class ServiceSqlTestSuite { public class ServiceSqlTestSuite {

View File

@ -0,0 +1,207 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.sql;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode;
import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.controller.AbstractControllerTest;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@DaoSqlTest
public class SequentialTimeseriesPersistenceTest extends AbstractControllerTest {
static final int TIMEOUT = 30;
final String TOTALIZER = "Totalizer";
final int TTL = 99999;
final String GENERIC_CUMULATIVE_OBJ = "genericCumulativeObj";
final List<Long> ts = List.of(10L, 20L, 30L, 40L, 60L, 70L, 50L, 80L);
final List<Long> msgValue = List.of(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L);
@Autowired
TimeseriesService timeseriesService;
TbMsgTimeseriesNodeConfiguration configuration;
Tenant savedTenant;
User tenantAdmin;
@Before
public void beforeTest() throws Exception {
configuration = new TbMsgTimeseriesNodeConfiguration();
configuration.setUseServerTs(true);
loginSysAdmin();
Tenant tenant = new Tenant();
tenant.setTitle("My tenant");
savedTenant = doPost("/api/tenant", tenant, Tenant.class);
Assert.assertNotNull(savedTenant);
tenantAdmin = new User();
tenantAdmin.setAuthority(Authority.TENANT_ADMIN);
tenantAdmin.setTenantId(savedTenant.getId());
tenantAdmin.setEmail("tenant2@thingsboard.org");
tenantAdmin.setFirstName("Joe");
tenantAdmin.setLastName("Downs");
tenantAdmin = createUserAndLogin(tenantAdmin, "testPassword1");
}
@After
public void afterTest() throws Exception {
loginSysAdmin();
doDelete("/api/tenant/" + savedTenant.getId().getId().toString()).andExpect(status().isOk());
}
@Test
public void testSequentialTimeseriesPersistence() throws Exception {
Asset asset = saveAsset("Asset");
Device deviceA = saveDevice("Device A");
Device deviceB = saveDevice("Device B");
Device deviceC = saveDevice("Device C");
Device deviceD = saveDevice("Device D");
List<Device> devices = List.of(deviceA, deviceB, deviceC, deviceD);
for (int i = 0; i < 2; i++) {
int idx = i * devices.size();
saveLatestTsForAssetAndDevice(devices, asset, idx);
checkDiffBetweenLatestTsForDevicesAndAsset(devices, asset);
}
}
Device saveDevice(String name) throws Exception {
Device device = new Device();
device.setName(name);
device.setType("default");
Device savedDevice = doPost("/api/device", device, Device.class);
Assert.assertNotNull(savedDevice);
return savedDevice;
}
Asset saveAsset(String name) throws Exception {
Asset asset = new Asset();
asset.setName(name);
asset.setType("default");
Asset savedAsset = doPost("/api/asset", asset, Asset.class);
Assert.assertNotNull(savedAsset);
return savedAsset;
}
void saveLatestTsForAssetAndDevice(List<Device> devices, Asset asset, int idx) throws ExecutionException, InterruptedException, TimeoutException {
for (Device device : devices) {
TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(),
device.getId(),
getTbMsgMetadata(device.getName(), ts.get(idx)),
TbMsgDataType.JSON,
getTbMsgData(msgValue.get(idx)));
saveDeviceTsEntry(device.getId(), tbMsg, msgValue.get(idx));
saveAssetTsEntry(asset, device.getName(), msgValue.get(idx), TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isUseServerTs()));
idx++;
}
}
void checkDiffBetweenLatestTsForDevicesAndAsset(List<Device> devices, Asset asset) throws ExecutionException, InterruptedException, TimeoutException {
TsKvEntry assetTsKvEntry = getTsKvLatest(asset.getId(), GENERIC_CUMULATIVE_OBJ);
Assert.assertTrue(assetTsKvEntry.getJsonValue().isPresent());
JsonObject assetJsonObject = new JsonParser().parse(assetTsKvEntry.getJsonValue().get()).getAsJsonObject();
for (Device device : devices) {
Long assetValue = assetJsonObject.get(device.getName()).getAsLong();
TsKvEntry deviceLatest = getTsKvLatest(device.getId(), TOTALIZER);
Assert.assertTrue(deviceLatest.getLongValue().isPresent());
Long deviceValue = deviceLatest.getLongValue().get();
Assert.assertEquals(assetValue, deviceValue);
}
}
String getTbMsgData(long value) {
return "{\"Totalizer\": " + value + "}";
}
TbMsgMetaData getTbMsgMetadata(String name, long ts) {
Map<String, String> metadata = new HashMap<>();
metadata.put("deviceName", name);
metadata.put("ts", String.valueOf(ts));
return new TbMsgMetaData(metadata);
}
void saveDeviceTsEntry(EntityId entityId, TbMsg tbMsg, long value) throws ExecutionException, InterruptedException, TimeoutException {
TsKvEntry tsKvEntry = new BasicTsKvEntry(TbMsgTimeseriesNode.computeTs(tbMsg, configuration.isUseServerTs()), new LongDataEntry(TOTALIZER, value));
saveTimeseries(entityId, tsKvEntry);
}
void saveAssetTsEntry(Asset asset, String key, long value, long ts) throws ExecutionException, InterruptedException, TimeoutException {
Optional<String> tsKvEntryOpt = getTsKvLatest(asset.getId(), GENERIC_CUMULATIVE_OBJ).getJsonValue();
TsKvEntry saveTsKvEntry = new BasicTsKvEntry(ts, new JsonDataEntry(GENERIC_CUMULATIVE_OBJ, getJsonObject(key, value, tsKvEntryOpt).toString()));
saveTimeseries(asset.getId(), saveTsKvEntry);
}
@NotNull
JsonObject getJsonObject(String key, long value, Optional<String> tsKvEntryOpt) {
JsonObject jsonObject = new JsonObject();
if (tsKvEntryOpt.isPresent()) {
jsonObject = new JsonParser().parse(tsKvEntryOpt.get()).getAsJsonObject();
}
jsonObject.addProperty(key, value);
return jsonObject;
}
void saveTimeseries(EntityId entityId, TsKvEntry saveTsKvEntry) throws InterruptedException, ExecutionException, TimeoutException {
timeseriesService.save(savedTenant.getId(), entityId, List.of(saveTsKvEntry), TTL).get(TIMEOUT, TimeUnit.SECONDS);
}
TsKvEntry getTsKvLatest(EntityId entityId, String key) throws InterruptedException, ExecutionException, TimeoutException {
List<TsKvEntry> tsKvEntries = timeseriesService.findLatest(
savedTenant.getTenantId(),
entityId,
List.of(key)).get(TIMEOUT, TimeUnit.SECONDS);
Assert.assertEquals(1, tsKvEntries.size());
return tsKvEntries.get(0);
}
}

View File

@ -46,8 +46,16 @@ import java.util.concurrent.TimeUnit;
configClazz = TbMsgTimeseriesNodeConfiguration.class, configClazz = TbMsgTimeseriesNodeConfiguration.class,
nodeDescription = "Saves timeseries data", nodeDescription = "Saves timeseries data",
nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY_REQUEST' message type. " + nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY_REQUEST' message type. " +
"Timestamp in milliseconds will be taken from metadata.ts, otherwise 'now' timestamp will be applied. " + "Timestamp in milliseconds will be taken from metadata.ts, otherwise 'now' message timestamp will be applied. " +
"Allows stopping updating values for incoming keys in the latest ts_kv table if 'skipLatestPersistence' is set to true.", "Allows stopping updating values for incoming keys in the latest ts_kv table if 'skipLatestPersistence' is set to true.\n " +
"<br/>" +
"Enable 'useServerTs' param to use the timestamp of the message processing instead of the timestamp from the message. " +
"Useful for all sorts of sequential processing if you merge messages from multiple sources (devices, assets, etc).\n" +
"<br/>" +
"In the case of sequential processing, the platform guarantees that the messages are processed in the order of their submission to the queue. " +
"However, the timestamp of the messages originated by multiple devices/servers may be unsynchronized long before they are pushed to the queue. " +
"The DB layer has certain optimizations to ignore the updates of the \"attributes\" and \"latest values\" tables if the new record has a timestamp that is older than the previous record. " +
"So, to make sure that all the messages will be processed correctly, one should enable this parameter for sequential message processing scenarios.",
uiResources = {"static/rulenode/rulenode-core-config.js"}, uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeTimeseriesConfig", configDirective = "tbActionNodeTimeseriesConfig",
icon = "file_upload" icon = "file_upload"
@ -77,7 +85,7 @@ public class TbMsgTimeseriesNode implements TbNode {
ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
return; return;
} }
long ts = getTs(msg); long ts = computeTs(msg, config.isUseServerTs());
String src = msg.getData(); String src = msg.getData();
Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts); Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts);
if (tsKvMap.isEmpty()) { if (tsKvMap.isEmpty()) {
@ -102,6 +110,10 @@ public class TbMsgTimeseriesNode implements TbNode {
} }
} }
public static long computeTs(TbMsg msg, boolean ignoreMetadataTs) {
return ignoreMetadataTs ? System.currentTimeMillis() : getTs(msg);
}
public static long getTs(TbMsg msg) { public static long getTs(TbMsg msg) {
long ts = -1; long ts = -1;
String tsStr = msg.getMetaData().getValue("ts"); String tsStr = msg.getMetaData().getValue("ts");

View File

@ -23,12 +23,14 @@ public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration<TbMsg
private long defaultTTL; private long defaultTTL;
private boolean skipLatestPersistence; private boolean skipLatestPersistence;
private boolean useServerTs;
@Override @Override
public TbMsgTimeseriesNodeConfiguration defaultConfiguration() { public TbMsgTimeseriesNodeConfiguration defaultConfiguration() {
TbMsgTimeseriesNodeConfiguration configuration = new TbMsgTimeseriesNodeConfiguration(); TbMsgTimeseriesNodeConfiguration configuration = new TbMsgTimeseriesNodeConfiguration();
configuration.setDefaultTTL(0L); configuration.setDefaultTTL(0L);
configuration.setSkipLatestPersistence(false); configuration.setSkipLatestPersistence(false);
configuration.setUseServerTs(false);
return configuration; return configuration;
} }
} }