From 89e9d27baf446ec28c43343deed23dc9cdd987ae Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Thu, 8 Jun 2017 21:13:57 +0300 Subject: [PATCH] TTL Rule Implementation --- .../plugin/PluginProcessingContext.java | 8 ++- .../src/main/resources/actor-system.conf | 2 +- .../dao/timeseries/BaseTimeseriesDao.java | 68 ++++++++++++++----- .../dao/timeseries/BaseTimeseriesService.java | 18 +++-- .../server/dao/timeseries/TimeseriesDao.java | 7 +- .../dao/timeseries/TimeseriesService.java | 2 + .../extensions/api/plugins/PluginContext.java | 5 +- ...TelemetryUploadRequestRuleToPluginMsg.java | 7 +- .../telemetry/TelemetryPluginAction.java | 21 ++++-- .../TelemetryPluginActionConfiguration.java | 31 +++++++++ .../handlers/TelemetryRestMsgHandler.java | 8 ++- .../handlers/TelemetryRuleMsgHandler.java | 2 +- .../TelemetryPluginActionDescriptor.json | 50 ++++++++++++++ 13 files changed, 190 insertions(+), 39 deletions(-) create mode 100644 extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginActionConfiguration.java create mode 100644 extensions-core/src/main/resources/TelemetryPluginActionDescriptor.json diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java index 57a838f471..fcde477821 100644 --- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java @@ -168,12 +168,18 @@ public final class PluginProcessingContext implements PluginContext { @Override public void saveTsData(final EntityId entityId, final List entries, final PluginCallback callback) { + saveTsData(entityId, entries, 0L, callback); + } + + @Override + public void saveTsData(final EntityId entityId, final List entries, long ttl, final PluginCallback callback) { validate(entityId, new ValidationCallback(callback, ctx -> { - ListenableFuture> rsListFuture = pluginCtx.tsService.save(entityId, entries); + ListenableFuture> rsListFuture = pluginCtx.tsService.save(entityId, entries, ttl); Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor); })); } + @Override public void loadTimeseries(final EntityId entityId, final List queries, final PluginCallback> callback) { validate(entityId, new ValidationCallback(callback, ctx -> { diff --git a/application/src/main/resources/actor-system.conf b/application/src/main/resources/actor-system.conf index e8e899a476..3cd319f616 100644 --- a/application/src/main/resources/actor-system.conf +++ b/application/src/main/resources/actor-system.conf @@ -19,7 +19,7 @@ akka { # JVM shutdown, System.exit(-1), in case of a fatal error, # such as OutOfMemoryError jvm-exit-on-fatal-error = off - loglevel = "INFO" + loglevel = "DEBUG" loggers = ["akka.event.slf4j.Slf4jLogger"] } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java index 26fe3fbb45..3de2127fd7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java @@ -30,7 +30,6 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.*; import org.thingsboard.server.common.data.kv.DataType; import org.thingsboard.server.dao.AbstractAsyncDao; -import org.thingsboard.server.dao.AbstractDao; import org.thingsboard.server.dao.model.ModelConstants; import javax.annotation.Nullable; @@ -40,8 +39,6 @@ import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.Collectors; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; @@ -64,8 +61,10 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao private TsPartitionDate tsFormat; private PreparedStatement partitionInsertStmt; + private PreparedStatement partitionInsertTtlStmt; private PreparedStatement[] latestInsertStmts; private PreparedStatement[] saveStmts; + private PreparedStatement[] saveTtlStmts; private PreparedStatement[] fetchStmts; private PreparedStatement findLatestStmt; private PreparedStatement findAllLatestStmt; @@ -255,15 +254,32 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao } @Override - public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry) { + public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) { DataType type = tsKvEntry.getDataType(); - BoundStatement stmt = getSaveStmt(type).bind() - .setString(0, entityId.getEntityType().name()) + BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind(); + stmt.setString(0, entityId.getEntityType().name()) .setUUID(1, entityId.getId()) .setString(2, tsKvEntry.getKey()) .setLong(3, partition) .setLong(4, tsKvEntry.getTs()); addValue(tsKvEntry, stmt, 5); + if (ttl > 0) { + stmt.setInt(6, (int) ttl); + } + return executeAsyncWrite(stmt); + } + + @Override + public ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl) { + log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key); + BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind(); + stmt = stmt.setString(0, entityId.getEntityType().name()) + .setUUID(1, entityId.getId()) + .setLong(2, partition) + .setString(3, key); + if (ttl > 0) { + stmt.setInt(4, (int) ttl); + } return executeAsyncWrite(stmt); } @@ -279,16 +295,6 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao return executeAsyncWrite(stmt); } - @Override - public ResultSetFuture savePartition(EntityId entityId, long partition, String key) { - log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key); - return executeAsyncWrite(getPartitionInsertStmt().bind() - .setString(0, entityId.getEntityType().name()) - .setUUID(1, entityId.getId()) - .setLong(2, partition) - .setString(3, key)); - } - @Override public List convertResultToTsKvEntryList(List rows) { List entries = new ArrayList<>(rows.size()); @@ -365,6 +371,23 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao return saveStmts[dataType.ordinal()]; } + private PreparedStatement getSaveTtlStmt(DataType dataType) { + if (saveTtlStmts == null) { + saveTtlStmts = new PreparedStatement[DataType.values().length]; + for (DataType type : DataType.values()) { + saveTtlStmts[type.ordinal()] = getSession().prepare("INSERT INTO " + ModelConstants.TS_KV_CF + + "(" + ModelConstants.ENTITY_TYPE_COLUMN + + "," + ModelConstants.ENTITY_ID_COLUMN + + "," + ModelConstants.KEY_COLUMN + + "," + ModelConstants.PARTITION_COLUMN + + "," + ModelConstants.TS_COLUMN + + "," + getColumnName(type) + ")" + + " VALUES(?, ?, ?, ?, ?, ?) USING TTL ?"); + } + } + return saveTtlStmts[dataType.ordinal()]; + } + private PreparedStatement getFetchStmt(Aggregation aggType) { if (fetchStmts == null) { fetchStmts = new PreparedStatement[Aggregation.values().length]; @@ -418,6 +441,19 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao return partitionInsertStmt; } + private PreparedStatement getPartitionInsertTtlStmt() { + if (partitionInsertTtlStmt == null) { + partitionInsertTtlStmt = getSession().prepare("INSERT INTO " + ModelConstants.TS_KV_PARTITIONS_CF + + "(" + ModelConstants.ENTITY_TYPE_COLUMN + + "," + ModelConstants.ENTITY_ID_COLUMN + + "," + ModelConstants.PARTITION_COLUMN + + "," + ModelConstants.KEY_COLUMN + ")" + + " VALUES(?, ?, ?, ?) USING TTL ?"); + } + return partitionInsertTtlStmt; + } + + private PreparedStatement getFindLatestStmt() { if (findLatestStmt == null) { findLatestStmt = getSession().prepare("SELECT " + diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 5d722f995f..40135b2273 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -87,29 +87,33 @@ public class BaseTimeseriesService implements TimeseriesService { if (tsKvEntry == null) { throw new IncorrectParameterException("Key value entry can't be null"); } - UUID uid = entityId.getId(); long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs()); List futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY); - saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs); + saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs, 0L); return Futures.allAsList(futures); } @Override public ListenableFuture> save(EntityId entityId, List tsKvEntries) { + return save(entityId, tsKvEntries, 0L); + } + + @Override + public ListenableFuture> save(EntityId entityId, List tsKvEntries, long ttl) { validate(entityId); List futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY); for (TsKvEntry tsKvEntry : tsKvEntries) { if (tsKvEntry == null) { throw new IncorrectParameterException("Key value entry can't be null"); } - UUID uid = entityId.getId(); long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs()); - saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs); + saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs, ttl); } return Futures.allAsList(futures); } + @Override public TsKvEntry convertResultToTsKvEntry(Row row) { return timeseriesDao.convertResultToTsKvEntry(row); @@ -120,10 +124,10 @@ public class BaseTimeseriesService implements TimeseriesService { return timeseriesDao.convertResultToTsKvEntryList(rs.all()); } - private void saveAndRegisterFutures(List futures, EntityId entityId, TsKvEntry tsKvEntry, long partitionTs) { - futures.add(timeseriesDao.savePartition(entityId, partitionTs, tsKvEntry.getKey())); + private void saveAndRegisterFutures(List futures, EntityId entityId, TsKvEntry tsKvEntry, long partitionTs, long ttl) { + futures.add(timeseriesDao.savePartition(entityId, partitionTs, tsKvEntry.getKey(), ttl)); futures.add(timeseriesDao.saveLatest(entityId, tsKvEntry)); - futures.add(timeseriesDao.save(entityId, partitionTs, tsKvEntry)); + futures.add(timeseriesDao.save(entityId, partitionTs, tsKvEntry, ttl)); } private static void validate(EntityId entityId) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java index 1f9871eda1..08a61f61a2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java @@ -23,9 +23,6 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvQuery; import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; /** * @author Andrew Shvayka @@ -40,9 +37,9 @@ public interface TimeseriesDao { ResultSetFuture findAllLatest(EntityId entityId); - ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry); + ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl); - ResultSetFuture savePartition(EntityId entityId, long partition, String key); + ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl); ResultSetFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index 5c9c961b28..fe560747e9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -44,6 +44,8 @@ public interface TimeseriesService { ListenableFuture> save(EntityId entityId, List tsKvEntry); + ListenableFuture> save(EntityId entityId, List tsKvEntry, long ttl); + TsKvEntry convertResultToTsKvEntry(Row row); List convertResultSetToTsKvEntryList(ResultSet rs); diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java index 2477ac3d37..2d346e7724 100644 --- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java +++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java @@ -79,7 +79,9 @@ public interface PluginContext { void saveTsData(EntityId entityId, TsKvEntry entry, PluginCallback callback); - void saveTsData(EntityId entityId, List entry, PluginCallback callback); + void saveTsData(EntityId entityId, List entries, PluginCallback callback); + + void saveTsData(EntityId deviceId, List entries, long ttl, PluginCallback pluginCallback); void loadTimeseries(EntityId entityId, List queries, PluginCallback> callback); @@ -106,4 +108,5 @@ public interface PluginContext { void loadAttributes(EntityId entityId, Collection attributeTypes, Collection attributeKeys, PluginCallback> callback); void getCustomerDevices(TenantId tenantId, CustomerId customerId, int limit, PluginCallback> callback); + } diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/TelemetryUploadRequestRuleToPluginMsg.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/TelemetryUploadRequestRuleToPluginMsg.java index 2d1678a4c5..df5c900159 100644 --- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/TelemetryUploadRequestRuleToPluginMsg.java +++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/TelemetryUploadRequestRuleToPluginMsg.java @@ -23,9 +23,14 @@ import org.thingsboard.server.common.msg.core.TelemetryUploadRequest; public class TelemetryUploadRequestRuleToPluginMsg extends AbstractRuleToPluginMsg { private static final long serialVersionUID = 1L; + private final long ttl; - public TelemetryUploadRequestRuleToPluginMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, TelemetryUploadRequest payload) { + public TelemetryUploadRequestRuleToPluginMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, TelemetryUploadRequest payload, long ttl) { super(tenantId, customerId, deviceId, payload); + this.ttl = ttl; } + public long getTtl() { + return ttl; + } } diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginAction.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginAction.java index 63006aa9bb..ce6fd57c5f 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginAction.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginAction.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.extensions.core.action.telemetry; +import org.springframework.util.StringUtils; import org.thingsboard.server.common.msg.core.GetAttributesRequest; import org.thingsboard.server.common.msg.core.TelemetryUploadRequest; import org.thingsboard.server.common.msg.core.UpdateAttributesRequest; @@ -23,7 +24,6 @@ import org.thingsboard.server.common.msg.session.FromDeviceMsg; import org.thingsboard.server.common.msg.session.MsgType; import org.thingsboard.server.common.msg.session.ToDeviceMsg; import org.thingsboard.server.extensions.api.component.Action; -import org.thingsboard.server.extensions.api.component.EmptyComponentConfiguration; import org.thingsboard.server.extensions.api.plugins.PluginAction; import org.thingsboard.server.extensions.api.plugins.msg.*; import org.thingsboard.server.extensions.api.rules.RuleContext; @@ -31,11 +31,22 @@ import org.thingsboard.server.extensions.api.rules.RuleProcessingMetaData; import org.thingsboard.server.extensions.api.rules.SimpleRuleLifecycleComponent; import java.util.Optional; +import java.util.concurrent.TimeUnit; -@Action(name = "Telemetry Plugin Action") -public class TelemetryPluginAction extends SimpleRuleLifecycleComponent implements PluginAction { +@Action(name = "Telemetry Plugin Action", descriptor = "TelemetryPluginActionDescriptor.json", configuration = TelemetryPluginActionConfiguration.class) +public class TelemetryPluginAction extends SimpleRuleLifecycleComponent implements PluginAction { - public void init(EmptyComponentConfiguration configuration) { + protected TelemetryPluginActionConfiguration configuration; + protected long ttl; + + @Override + public void init(TelemetryPluginActionConfiguration configuration) { + this.configuration = configuration; + if (StringUtils.isEmpty(configuration.getTimeUnit()) || configuration.getTtlValue() == 0L) { + this.ttl = 0L; + } else { + this.ttl = TimeUnit.valueOf(configuration.getTimeUnit()).toSeconds(configuration.getTtlValue()); + } } @Override @@ -44,7 +55,7 @@ public class TelemetryPluginAction extends SimpleRuleLifecycleComponent implemen if (msg.getMsgType() == MsgType.POST_TELEMETRY_REQUEST) { TelemetryUploadRequest payload = (TelemetryUploadRequest) msg; return Optional.of(new TelemetryUploadRequestRuleToPluginMsg(toDeviceActorMsg.getTenantId(), toDeviceActorMsg.getCustomerId(), - toDeviceActorMsg.getDeviceId(), payload)); + toDeviceActorMsg.getDeviceId(), payload, ttl)); } else if (msg.getMsgType() == MsgType.POST_ATTRIBUTES_REQUEST) { UpdateAttributesRequest payload = (UpdateAttributesRequest) msg; return Optional.of(new UpdateAttributesRequestRuleToPluginMsg(toDeviceActorMsg.getTenantId(), toDeviceActorMsg.getCustomerId(), diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginActionConfiguration.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginActionConfiguration.java new file mode 100644 index 0000000000..896481eb46 --- /dev/null +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/action/telemetry/TelemetryPluginActionConfiguration.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2017 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.extensions.core.action.telemetry; + +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.Data; + +/** + * @author Andrew Shvayka + */ +@Data +@JsonInclude(JsonInclude.Include.NON_NULL) +public class TelemetryPluginActionConfiguration { + + private String timeUnit; + private int ttlValue; + +} diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java index 5b32474b17..440e28b6d0 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java @@ -148,6 +148,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { String[] pathParams = request.getPathParams(); EntityId entityId; String scope; + long ttl = 0L; TelemetryFeature feature; if (pathParams.length == 2) { entityId = DeviceId.fromString(pathParams[0]); @@ -161,6 +162,11 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { entityId = EntityIdFactory.getByTypeAndId(pathParams[0], pathParams[1]); feature = TelemetryFeature.forName(pathParams[2].toUpperCase()); scope = pathParams[3]; + } else if (pathParams.length == 5) { + entityId = EntityIdFactory.getByTypeAndId(pathParams[0], pathParams[1]); + feature = TelemetryFeature.forName(pathParams[2].toUpperCase()); + scope = pathParams[3]; + ttl = Long.parseLong(pathParams[4]); } else { msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); return; @@ -211,7 +217,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { entries.add(new BasicTsKvEntry(entry.getKey(), kv)); } } - ctx.saveTsData(entityId, entries, new PluginCallback() { + ctx.saveTsData(entityId, entries, ttl, new PluginCallback() { @Override public void onSuccess(PluginContext ctx, Void value) { msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK)); diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java index 1ce797fe4b..242345d859 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java @@ -92,7 +92,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler { tsKvEntries.add(new BasicTsKvEntry(entry.getKey(), kv)); } } - ctx.saveTsData(msg.getDeviceId(), tsKvEntries, new PluginCallback() { + ctx.saveTsData(msg.getDeviceId(), tsKvEntries, msg.getTtl(), new PluginCallback() { @Override public void onSuccess(PluginContext ctx, Void data) { ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onSuccess(request.getMsgType(), request.getRequestId()))); diff --git a/extensions-core/src/main/resources/TelemetryPluginActionDescriptor.json b/extensions-core/src/main/resources/TelemetryPluginActionDescriptor.json new file mode 100644 index 0000000000..04868166e1 --- /dev/null +++ b/extensions-core/src/main/resources/TelemetryPluginActionDescriptor.json @@ -0,0 +1,50 @@ +{ + "schema": { + "title": "Telemetry Plugin Action Configuration", + "type": "object", + "properties": { + "timeUnit": { + "title": "Time Unit", + "type": "string", + "default": "DAYS" + }, + "ttlValue": { + "title": "TTL", + "type": "integer", + "default": 365, + "minimum": 0, + "maximum": 100000 + } + }, + "required": [ + "timeUnit", + "ttlValue" + ] + }, + "form": [ + { + "key": "timeUnit", + "type": "rc-select", + "multiple": false, + "items": [ + { + "value": "SECONDS", + "label": "Seconds" + }, + { + "value": "MINUTES", + "label": "Minutes" + }, + { + "value": "HOURS", + "label": "Hours" + }, + { + "value": "DAYS", + "label": "Days" + } + ] + }, + "ttlValue" + ] +} \ No newline at end of file