Merge remote-tracking branch 'upstream/master' into dao-refactoring-vs
This commit is contained in:
		
						commit
						11a2844072
					
				@ -166,12 +166,18 @@ public final class PluginProcessingContext implements PluginContext {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void saveTsData(final EntityId entityId, final List<TsKvEntry> entries, final PluginCallback<Void> callback) {
 | 
			
		||||
        saveTsData(entityId, entries, 0L, callback);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void saveTsData(final EntityId entityId, final List<TsKvEntry> entries, long ttl, final PluginCallback<Void> callback) {
 | 
			
		||||
        validate(entityId, new ValidationCallback(callback, ctx -> {
 | 
			
		||||
            ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(entityId, entries);
 | 
			
		||||
            ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(entityId, entries, ttl);
 | 
			
		||||
            Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
 | 
			
		||||
        }));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void loadTimeseries(final EntityId entityId, final List<TsKvQuery> queries, final PluginCallback<List<TsKvEntry>> callback) {
 | 
			
		||||
        validate(entityId, new ValidationCallback(callback, ctx -> {
 | 
			
		||||
@ -324,7 +330,19 @@ public final class PluginProcessingContext implements PluginContext {
 | 
			
		||||
                            callback.onSuccess(this, Boolean.FALSE);
 | 
			
		||||
                        } else {
 | 
			
		||||
                            ListenableFuture<RuleMetaData> ruleFuture = pluginCtx.ruleService.findRuleByIdAsync(new RuleId(entityId.getId()));
 | 
			
		||||
                            Futures.addCallback(ruleFuture, getCallback(callback, rule -> rule != null && rule.getTenantId().equals(ctx.getTenantId())));
 | 
			
		||||
                            Futures.addCallback(ruleFuture, getCallback(callback, rule -> {
 | 
			
		||||
                                if (rule == null) {
 | 
			
		||||
                                    return Boolean.FALSE;
 | 
			
		||||
                                } else {
 | 
			
		||||
                                    if (ctx.isTenantAdmin() && !rule.getTenantId().equals(ctx.getTenantId())) {
 | 
			
		||||
                                        return Boolean.FALSE;
 | 
			
		||||
                                    } else if (ctx.isSystemAdmin() && !rule.getTenantId().isNullUid()) {
 | 
			
		||||
                                        return Boolean.FALSE;
 | 
			
		||||
                                    } else {
 | 
			
		||||
                                        return Boolean.TRUE;
 | 
			
		||||
                                    }
 | 
			
		||||
                                }
 | 
			
		||||
                            }));
 | 
			
		||||
                        }
 | 
			
		||||
                        return;
 | 
			
		||||
                    case PLUGIN:
 | 
			
		||||
@ -332,7 +350,19 @@ public final class PluginProcessingContext implements PluginContext {
 | 
			
		||||
                            callback.onSuccess(this, Boolean.FALSE);
 | 
			
		||||
                        } else {
 | 
			
		||||
                            ListenableFuture<PluginMetaData> pluginFuture = pluginCtx.pluginService.findPluginByIdAsync(new PluginId(entityId.getId()));
 | 
			
		||||
                            Futures.addCallback(pluginFuture, getCallback(callback, plugin -> plugin != null && plugin.getTenantId().equals(ctx.getTenantId())));
 | 
			
		||||
                            Futures.addCallback(pluginFuture, getCallback(callback, plugin -> {
 | 
			
		||||
                                if (plugin == null) {
 | 
			
		||||
                                    return Boolean.FALSE;
 | 
			
		||||
                                } else {
 | 
			
		||||
                                    if (ctx.isTenantAdmin() && !plugin.getTenantId().equals(ctx.getTenantId())) {
 | 
			
		||||
                                        return Boolean.FALSE;
 | 
			
		||||
                                    } else if (ctx.isSystemAdmin() && !plugin.getTenantId().isNullUid()) {
 | 
			
		||||
                                        return Boolean.FALSE;
 | 
			
		||||
                                    } else {
 | 
			
		||||
                                        return Boolean.TRUE;
 | 
			
		||||
                                    }
 | 
			
		||||
                                }
 | 
			
		||||
                            }));
 | 
			
		||||
                        }
 | 
			
		||||
                        return;
 | 
			
		||||
                    case CUSTOMER:
 | 
			
		||||
 | 
			
		||||
@ -19,7 +19,7 @@ akka {
 | 
			
		||||
  # JVM shutdown, System.exit(-1), in case of a fatal error,
 | 
			
		||||
  # such as OutOfMemoryError
 | 
			
		||||
  jvm-exit-on-fatal-error = off
 | 
			
		||||
  loglevel = "INFO"
 | 
			
		||||
  loglevel = "DEBUG"
 | 
			
		||||
  loggers = ["akka.event.slf4j.Slf4jLogger"]
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -76,29 +76,33 @@ public class BaseTimeseriesService implements TimeseriesService {
 | 
			
		||||
        if (tsKvEntry == null) {
 | 
			
		||||
            throw new IncorrectParameterException("Key value entry can't be null");
 | 
			
		||||
        }
 | 
			
		||||
        UUID uid = entityId.getId();
 | 
			
		||||
        long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs());
 | 
			
		||||
 | 
			
		||||
        List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
 | 
			
		||||
        saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs);
 | 
			
		||||
        saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs, 0L);
 | 
			
		||||
        return Futures.allAsList(futures);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntries) {
 | 
			
		||||
        return save(entityId, tsKvEntries, 0L);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
 | 
			
		||||
        validate(entityId);
 | 
			
		||||
        List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY);
 | 
			
		||||
        for (TsKvEntry tsKvEntry : tsKvEntries) {
 | 
			
		||||
            if (tsKvEntry == null) {
 | 
			
		||||
                throw new IncorrectParameterException("Key value entry can't be null");
 | 
			
		||||
            }
 | 
			
		||||
            UUID uid = entityId.getId();
 | 
			
		||||
            long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs());
 | 
			
		||||
            saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs);
 | 
			
		||||
            saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs, ttl);
 | 
			
		||||
        }
 | 
			
		||||
        return Futures.allAsList(futures);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TsKvEntry convertResultToTsKvEntry(Row row) {
 | 
			
		||||
        return timeseriesDao.convertResultToTsKvEntry(row);
 | 
			
		||||
@ -109,10 +113,10 @@ public class BaseTimeseriesService implements TimeseriesService {
 | 
			
		||||
        return timeseriesDao.convertResultToTsKvEntryList(rs.all());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void saveAndRegisterFutures(List<ResultSetFuture> futures, EntityId entityId, TsKvEntry tsKvEntry, long partitionTs) {
 | 
			
		||||
        futures.add(timeseriesDao.savePartition(entityId, partitionTs, tsKvEntry.getKey()));
 | 
			
		||||
    private void saveAndRegisterFutures(List<ResultSetFuture> futures, EntityId entityId, TsKvEntry tsKvEntry, long partitionTs, long ttl) {
 | 
			
		||||
        futures.add(timeseriesDao.savePartition(entityId, partitionTs, tsKvEntry.getKey(), ttl));
 | 
			
		||||
        futures.add(timeseriesDao.saveLatest(entityId, tsKvEntry));
 | 
			
		||||
        futures.add(timeseriesDao.save(entityId, partitionTs, tsKvEntry));
 | 
			
		||||
        futures.add(timeseriesDao.save(entityId, partitionTs, tsKvEntry, ttl));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static void validate(EntityId entityId) {
 | 
			
		||||
 | 
			
		||||
@ -42,7 +42,6 @@ import java.time.ZoneOffset;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 | 
			
		||||
@ -65,8 +64,10 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 | 
			
		||||
    private TsPartitionDate tsFormat;
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement partitionInsertStmt;
 | 
			
		||||
    private PreparedStatement partitionInsertTtlStmt;
 | 
			
		||||
    private PreparedStatement[] latestInsertStmts;
 | 
			
		||||
    private PreparedStatement[] saveStmts;
 | 
			
		||||
    private PreparedStatement[] saveTtlStmts;
 | 
			
		||||
    private PreparedStatement[] fetchStmts;
 | 
			
		||||
    private PreparedStatement findLatestStmt;
 | 
			
		||||
    private PreparedStatement findAllLatestStmt;
 | 
			
		||||
@ -256,15 +257,32 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry) {
 | 
			
		||||
    public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) {
 | 
			
		||||
        DataType type = tsKvEntry.getDataType();
 | 
			
		||||
        BoundStatement stmt = getSaveStmt(type).bind()
 | 
			
		||||
                .setString(0, entityId.getEntityType().name())
 | 
			
		||||
        BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind();
 | 
			
		||||
        stmt.setString(0, entityId.getEntityType().name())
 | 
			
		||||
                .setUUID(1, entityId.getId())
 | 
			
		||||
                .setString(2, tsKvEntry.getKey())
 | 
			
		||||
                .setLong(3, partition)
 | 
			
		||||
                .setLong(4, tsKvEntry.getTs());
 | 
			
		||||
        addValue(tsKvEntry, stmt, 5);
 | 
			
		||||
        if (ttl > 0) {
 | 
			
		||||
            stmt.setInt(6, (int) ttl);
 | 
			
		||||
        }
 | 
			
		||||
        return executeAsyncWrite(stmt);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl) {
 | 
			
		||||
        log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
 | 
			
		||||
        BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind();
 | 
			
		||||
        stmt = stmt.setString(0, entityId.getEntityType().name())
 | 
			
		||||
                .setUUID(1, entityId.getId())
 | 
			
		||||
                .setLong(2, partition)
 | 
			
		||||
                .setString(3, key);
 | 
			
		||||
        if (ttl > 0) {
 | 
			
		||||
            stmt.setInt(4, (int) ttl);
 | 
			
		||||
        }
 | 
			
		||||
        return executeAsyncWrite(stmt);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -280,16 +298,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 | 
			
		||||
        return executeAsyncWrite(stmt);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ResultSetFuture savePartition(EntityId entityId, long partition, String key) {
 | 
			
		||||
        log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
 | 
			
		||||
        return executeAsyncWrite(getPartitionInsertStmt().bind()
 | 
			
		||||
                .setString(0, entityId.getEntityType().name())
 | 
			
		||||
                .setUUID(1, entityId.getId())
 | 
			
		||||
                .setLong(2, partition)
 | 
			
		||||
                .setString(3, key));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
 | 
			
		||||
        List<TsKvEntry> entries = new ArrayList<>(rows.size());
 | 
			
		||||
@ -366,6 +374,23 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 | 
			
		||||
        return saveStmts[dataType.ordinal()];
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getSaveTtlStmt(DataType dataType) {
 | 
			
		||||
        if (saveTtlStmts == null) {
 | 
			
		||||
            saveTtlStmts = new PreparedStatement[DataType.values().length];
 | 
			
		||||
            for (DataType type : DataType.values()) {
 | 
			
		||||
                saveTtlStmts[type.ordinal()] = getSession().prepare("INSERT INTO " + ModelConstants.TS_KV_CF +
 | 
			
		||||
                        "(" + ModelConstants.ENTITY_TYPE_COLUMN +
 | 
			
		||||
                        "," + ModelConstants.ENTITY_ID_COLUMN +
 | 
			
		||||
                        "," + ModelConstants.KEY_COLUMN +
 | 
			
		||||
                        "," + ModelConstants.PARTITION_COLUMN +
 | 
			
		||||
                        "," + ModelConstants.TS_COLUMN +
 | 
			
		||||
                        "," + getColumnName(type) + ")" +
 | 
			
		||||
                        " VALUES(?, ?, ?, ?, ?, ?) USING TTL ?");
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return saveTtlStmts[dataType.ordinal()];
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getFetchStmt(Aggregation aggType) {
 | 
			
		||||
        if (fetchStmts == null) {
 | 
			
		||||
            fetchStmts = new PreparedStatement[Aggregation.values().length];
 | 
			
		||||
@ -419,6 +444,19 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 | 
			
		||||
        return partitionInsertStmt;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getPartitionInsertTtlStmt() {
 | 
			
		||||
        if (partitionInsertTtlStmt == null) {
 | 
			
		||||
            partitionInsertTtlStmt = getSession().prepare("INSERT INTO " + ModelConstants.TS_KV_PARTITIONS_CF +
 | 
			
		||||
                    "(" + ModelConstants.ENTITY_TYPE_COLUMN +
 | 
			
		||||
                    "," + ModelConstants.ENTITY_ID_COLUMN +
 | 
			
		||||
                    "," + ModelConstants.PARTITION_COLUMN +
 | 
			
		||||
                    "," + ModelConstants.KEY_COLUMN + ")" +
 | 
			
		||||
                    " VALUES(?, ?, ?, ?) USING TTL ?");
 | 
			
		||||
        }
 | 
			
		||||
        return partitionInsertTtlStmt;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    private PreparedStatement getFindLatestStmt() {
 | 
			
		||||
        if (findLatestStmt == null) {
 | 
			
		||||
            findLatestStmt = getSession().prepare("SELECT " +
 | 
			
		||||
 | 
			
		||||
@ -23,7 +23,6 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvQuery;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @author Andrew Shvayka
 | 
			
		||||
@ -38,9 +37,9 @@ public interface TimeseriesDao {
 | 
			
		||||
 | 
			
		||||
    ResultSetFuture findAllLatest(EntityId entityId);
 | 
			
		||||
 | 
			
		||||
    ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry);
 | 
			
		||||
    ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl);
 | 
			
		||||
 | 
			
		||||
    ResultSetFuture savePartition(EntityId entityId, long partition, String key);
 | 
			
		||||
    ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl);
 | 
			
		||||
 | 
			
		||||
    ResultSetFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -42,6 +42,8 @@ public interface TimeseriesService {
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntry);
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
 | 
			
		||||
 | 
			
		||||
    TsKvEntry convertResultToTsKvEntry(Row row);
 | 
			
		||||
 | 
			
		||||
    List<TsKvEntry> convertResultSetToTsKvEntryList(ResultSet rs);
 | 
			
		||||
 | 
			
		||||
@ -79,7 +79,9 @@ public interface PluginContext {
 | 
			
		||||
 | 
			
		||||
    void saveTsData(EntityId entityId, TsKvEntry entry, PluginCallback<Void> callback);
 | 
			
		||||
 | 
			
		||||
    void saveTsData(EntityId entityId, List<TsKvEntry> entry, PluginCallback<Void> callback);
 | 
			
		||||
    void saveTsData(EntityId entityId, List<TsKvEntry> entries, PluginCallback<Void> callback);
 | 
			
		||||
 | 
			
		||||
    void saveTsData(EntityId deviceId, List<TsKvEntry> entries, long ttl, PluginCallback<Void> pluginCallback);
 | 
			
		||||
 | 
			
		||||
    void loadTimeseries(EntityId entityId, List<TsKvQuery> queries, PluginCallback<List<TsKvEntry>> callback);
 | 
			
		||||
 | 
			
		||||
@ -106,4 +108,5 @@ public interface PluginContext {
 | 
			
		||||
    void loadAttributes(EntityId entityId, Collection<String> attributeTypes, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback);
 | 
			
		||||
 | 
			
		||||
    void getCustomerDevices(TenantId tenantId, CustomerId customerId, int limit, PluginCallback<List<Device>> callback);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -23,9 +23,14 @@ import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
 | 
			
		||||
public class TelemetryUploadRequestRuleToPluginMsg extends AbstractRuleToPluginMsg<TelemetryUploadRequest> {
 | 
			
		||||
 | 
			
		||||
    private static final long serialVersionUID = 1L;
 | 
			
		||||
    private final long ttl;
 | 
			
		||||
 | 
			
		||||
    public TelemetryUploadRequestRuleToPluginMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, TelemetryUploadRequest payload) {
 | 
			
		||||
    public TelemetryUploadRequestRuleToPluginMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, TelemetryUploadRequest payload, long ttl) {
 | 
			
		||||
        super(tenantId, customerId, deviceId, payload);
 | 
			
		||||
        this.ttl = ttl;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public long getTtl() {
 | 
			
		||||
        return ttl;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -15,6 +15,7 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.extensions.core.action.telemetry;
 | 
			
		||||
 | 
			
		||||
import org.springframework.util.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.msg.core.GetAttributesRequest;
 | 
			
		||||
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
 | 
			
		||||
import org.thingsboard.server.common.msg.core.UpdateAttributesRequest;
 | 
			
		||||
@ -23,7 +24,6 @@ import org.thingsboard.server.common.msg.session.FromDeviceMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.MsgType;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
 | 
			
		||||
import org.thingsboard.server.extensions.api.component.Action;
 | 
			
		||||
import org.thingsboard.server.extensions.api.component.EmptyComponentConfiguration;
 | 
			
		||||
import org.thingsboard.server.extensions.api.plugins.PluginAction;
 | 
			
		||||
import org.thingsboard.server.extensions.api.plugins.msg.*;
 | 
			
		||||
import org.thingsboard.server.extensions.api.rules.RuleContext;
 | 
			
		||||
@ -31,11 +31,22 @@ import org.thingsboard.server.extensions.api.rules.RuleProcessingMetaData;
 | 
			
		||||
import org.thingsboard.server.extensions.api.rules.SimpleRuleLifecycleComponent;
 | 
			
		||||
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
 | 
			
		||||
@Action(name = "Telemetry Plugin Action")
 | 
			
		||||
public class TelemetryPluginAction extends SimpleRuleLifecycleComponent implements PluginAction<EmptyComponentConfiguration> {
 | 
			
		||||
@Action(name = "Telemetry Plugin Action", descriptor = "TelemetryPluginActionDescriptor.json", configuration = TelemetryPluginActionConfiguration.class)
 | 
			
		||||
public class TelemetryPluginAction extends SimpleRuleLifecycleComponent implements PluginAction<TelemetryPluginActionConfiguration> {
 | 
			
		||||
 | 
			
		||||
    public void init(EmptyComponentConfiguration configuration) {
 | 
			
		||||
    protected TelemetryPluginActionConfiguration configuration;
 | 
			
		||||
    protected long ttl;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TelemetryPluginActionConfiguration configuration) {
 | 
			
		||||
        this.configuration = configuration;
 | 
			
		||||
        if (StringUtils.isEmpty(configuration.getTimeUnit()) || configuration.getTtlValue() == 0L) {
 | 
			
		||||
            this.ttl = 0L;
 | 
			
		||||
        } else {
 | 
			
		||||
            this.ttl = TimeUnit.valueOf(configuration.getTimeUnit()).toSeconds(configuration.getTtlValue());
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -44,7 +55,7 @@ public class TelemetryPluginAction extends SimpleRuleLifecycleComponent implemen
 | 
			
		||||
        if (msg.getMsgType() == MsgType.POST_TELEMETRY_REQUEST) {
 | 
			
		||||
            TelemetryUploadRequest payload = (TelemetryUploadRequest) msg;
 | 
			
		||||
            return Optional.of(new TelemetryUploadRequestRuleToPluginMsg(toDeviceActorMsg.getTenantId(), toDeviceActorMsg.getCustomerId(),
 | 
			
		||||
                    toDeviceActorMsg.getDeviceId(), payload));
 | 
			
		||||
                    toDeviceActorMsg.getDeviceId(), payload, ttl));
 | 
			
		||||
        } else if (msg.getMsgType() == MsgType.POST_ATTRIBUTES_REQUEST) {
 | 
			
		||||
            UpdateAttributesRequest payload = (UpdateAttributesRequest) msg;
 | 
			
		||||
            return Optional.of(new UpdateAttributesRequestRuleToPluginMsg(toDeviceActorMsg.getTenantId(), toDeviceActorMsg.getCustomerId(),
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,31 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2017 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.extensions.core.action.telemetry;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.annotation.JsonInclude;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @author Andrew Shvayka
 | 
			
		||||
 */
 | 
			
		||||
@Data
 | 
			
		||||
@JsonInclude(JsonInclude.Include.NON_NULL)
 | 
			
		||||
public class TelemetryPluginActionConfiguration {
 | 
			
		||||
 | 
			
		||||
    private String timeUnit;
 | 
			
		||||
    private int ttlValue;
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -148,6 +148,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 | 
			
		||||
            String[] pathParams = request.getPathParams();
 | 
			
		||||
            EntityId entityId;
 | 
			
		||||
            String scope;
 | 
			
		||||
            long ttl = 0L;
 | 
			
		||||
            TelemetryFeature feature;
 | 
			
		||||
            if (pathParams.length == 2) {
 | 
			
		||||
                entityId = DeviceId.fromString(pathParams[0]);
 | 
			
		||||
@ -161,6 +162,11 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 | 
			
		||||
                entityId = EntityIdFactory.getByTypeAndId(pathParams[0], pathParams[1]);
 | 
			
		||||
                feature = TelemetryFeature.forName(pathParams[2].toUpperCase());
 | 
			
		||||
                scope = pathParams[3];
 | 
			
		||||
            } else if (pathParams.length == 5) {
 | 
			
		||||
                entityId = EntityIdFactory.getByTypeAndId(pathParams[0], pathParams[1]);
 | 
			
		||||
                feature = TelemetryFeature.forName(pathParams[2].toUpperCase());
 | 
			
		||||
                scope = pathParams[3];
 | 
			
		||||
                ttl = Long.parseLong(pathParams[4]);
 | 
			
		||||
            } else {
 | 
			
		||||
                msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
 | 
			
		||||
                return;
 | 
			
		||||
@ -211,7 +217,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 | 
			
		||||
                        entries.add(new BasicTsKvEntry(entry.getKey(), kv));
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                ctx.saveTsData(entityId, entries, new PluginCallback<Void>() {
 | 
			
		||||
                ctx.saveTsData(entityId, entries, ttl, new PluginCallback<Void>() {
 | 
			
		||||
                    @Override
 | 
			
		||||
                    public void onSuccess(PluginContext ctx, Void value) {
 | 
			
		||||
                        msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
 | 
			
		||||
 | 
			
		||||
@ -92,7 +92,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
 | 
			
		||||
                tsKvEntries.add(new BasicTsKvEntry(entry.getKey(), kv));
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        ctx.saveTsData(msg.getDeviceId(), tsKvEntries, new PluginCallback<Void>() {
 | 
			
		||||
        ctx.saveTsData(msg.getDeviceId(), tsKvEntries, msg.getTtl(), new PluginCallback<Void>() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(PluginContext ctx, Void data) {
 | 
			
		||||
                ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onSuccess(request.getMsgType(), request.getRequestId())));
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,50 @@
 | 
			
		||||
{
 | 
			
		||||
  "schema": {
 | 
			
		||||
    "title": "Telemetry Plugin Action Configuration",
 | 
			
		||||
    "type": "object",
 | 
			
		||||
    "properties": {
 | 
			
		||||
      "timeUnit": {
 | 
			
		||||
        "title": "Time Unit",
 | 
			
		||||
        "type": "string",
 | 
			
		||||
        "default": "DAYS"
 | 
			
		||||
      },
 | 
			
		||||
      "ttlValue": {
 | 
			
		||||
        "title": "TTL",
 | 
			
		||||
        "type": "integer",
 | 
			
		||||
        "default": 365,
 | 
			
		||||
        "minimum": 0,
 | 
			
		||||
        "maximum": 100000
 | 
			
		||||
      }
 | 
			
		||||
    },
 | 
			
		||||
    "required": [
 | 
			
		||||
      "timeUnit",
 | 
			
		||||
      "ttlValue"
 | 
			
		||||
    ]
 | 
			
		||||
  },
 | 
			
		||||
  "form": [
 | 
			
		||||
    {
 | 
			
		||||
      "key": "timeUnit",
 | 
			
		||||
      "type": "rc-select",
 | 
			
		||||
      "multiple": false,
 | 
			
		||||
      "items": [
 | 
			
		||||
        {
 | 
			
		||||
          "value": "SECONDS",
 | 
			
		||||
          "label": "Seconds"
 | 
			
		||||
        },
 | 
			
		||||
        {
 | 
			
		||||
          "value": "MINUTES",
 | 
			
		||||
          "label": "Minutes"
 | 
			
		||||
        },
 | 
			
		||||
        {
 | 
			
		||||
          "value": "HOURS",
 | 
			
		||||
          "label": "Hours"
 | 
			
		||||
        },
 | 
			
		||||
        {
 | 
			
		||||
          "value": "DAYS",
 | 
			
		||||
          "label": "Days"
 | 
			
		||||
        }
 | 
			
		||||
      ]
 | 
			
		||||
    },
 | 
			
		||||
    "ttlValue"
 | 
			
		||||
  ]
 | 
			
		||||
}
 | 
			
		||||
@ -51,7 +51,6 @@ export default function AttributeTableDirective($compile, $templateCache, $rootS
 | 
			
		||||
        scope.types = types;
 | 
			
		||||
 | 
			
		||||
        scope.entityType = attrs.entityType;
 | 
			
		||||
        scope.attributeScope = getAttributeScopeByValue(attrs.defaultAttributeScope);
 | 
			
		||||
 | 
			
		||||
        if (scope.entityType === types.entityType.device) {
 | 
			
		||||
            scope.attributeScopes = types.attributesScope;
 | 
			
		||||
@ -60,8 +59,13 @@ export default function AttributeTableDirective($compile, $templateCache, $rootS
 | 
			
		||||
            scope.attributeScopes = {};
 | 
			
		||||
            scope.attributeScopes.server = types.attributesScope.server;
 | 
			
		||||
            scope.attributeScopeSelectionReadonly = true;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        scope.attributeScope = getAttributeScopeByValue(attrs.defaultAttributeScope);
 | 
			
		||||
 | 
			
		||||
        if (scope.entityType != types.entityType.device) {
 | 
			
		||||
            if (scope.attributeScope != types.latestTelemetry) {
 | 
			
		||||
                scope.attributeScope = scope.attributeScopes.server.value;
 | 
			
		||||
                scope.attributeScope = scope.attributeScopes.server;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -81,8 +85,8 @@ export default function AttributeTableDirective($compile, $templateCache, $rootS
 | 
			
		||||
            search: null
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        scope.$watch("entityId", function(newVal, prevVal) {
 | 
			
		||||
            if (newVal && !angular.equals(newVal, prevVal)) {
 | 
			
		||||
        scope.$watch("entityId", function(newVal) {
 | 
			
		||||
            if (newVal) {
 | 
			
		||||
                scope.resetFilter();
 | 
			
		||||
                scope.getEntityAttributes(false, true);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
@ -171,7 +171,7 @@ export default angular.module('thingsboard.locale', [])
 | 
			
		||||
                "attribute": {
 | 
			
		||||
                    "attributes": "Attributes",
 | 
			
		||||
                    "latest-telemetry": "Latest telemetry",
 | 
			
		||||
                    "attributes-scope": "Device attributes scope",
 | 
			
		||||
                    "attributes-scope": "Entity attributes scope",
 | 
			
		||||
                    "scope-latest-telemetry": "Latest telemetry",
 | 
			
		||||
                    "scope-client": "Client attributes",
 | 
			
		||||
                    "scope-server": "Server attributes",
 | 
			
		||||
 | 
			
		||||
@ -138,6 +138,8 @@ export default function PluginController(pluginService, userService, importExpor
 | 
			
		||||
        vm.pluginGridConfig.topIndex = $stateParams.topIndex;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    vm.isPluginEditable = isPluginEditable;
 | 
			
		||||
 | 
			
		||||
    vm.activatePlugin = activatePlugin;
 | 
			
		||||
    vm.suspendPlugin = suspendPlugin;
 | 
			
		||||
    vm.exportPlugin = exportPlugin;
 | 
			
		||||
 | 
			
		||||
@ -19,7 +19,7 @@
 | 
			
		||||
    <details-buttons tb-help="vm.helpLinkIdForPlugin()" help-container-id="help-container">
 | 
			
		||||
        <div id="help-container"></div>
 | 
			
		||||
    </details-buttons>
 | 
			
		||||
    <md-tabs ng-class="{'tb-headless': vm.grid.detailsConfig.isDetailsEditMode}"
 | 
			
		||||
    <md-tabs ng-class="{'tb-headless': (vm.grid.detailsConfig.isDetailsEditMode || !vm.isPluginEditable(vm.grid.operatingItem()))}"
 | 
			
		||||
             id="tabs" md-border-bottom flex class="tb-absolute-fill">
 | 
			
		||||
        <md-tab label="{{ 'plugin.details' | translate }}">
 | 
			
		||||
            <tb-plugin plugin="vm.grid.operatingItem()"
 | 
			
		||||
@ -31,7 +31,7 @@
 | 
			
		||||
                 on-export-plugin="vm.exportPlugin(event, vm.grid.detailsConfig.currentItem)"
 | 
			
		||||
                 on-delete-plugin="vm.grid.deleteItem(event, vm.grid.detailsConfig.currentItem)"></tb-plugin>
 | 
			
		||||
        </md-tab>
 | 
			
		||||
        <md-tab ng-if="!vm.grid.detailsConfig.isDetailsEditMode" label="{{ 'attribute.attributes' | translate }}">
 | 
			
		||||
        <md-tab ng-if="!vm.grid.detailsConfig.isDetailsEditMode && vm.isPluginEditable(vm.grid.operatingItem())" label="{{ 'attribute.attributes' | translate }}">
 | 
			
		||||
            <tb-attribute-table flex
 | 
			
		||||
                                entity-id="vm.grid.operatingItem().id.id"
 | 
			
		||||
                                entity-type="{{vm.types.entityType.plugin}}"
 | 
			
		||||
@ -39,7 +39,7 @@
 | 
			
		||||
                                default-attribute-scope="{{vm.types.attributesScope.server.value}}">
 | 
			
		||||
            </tb-attribute-table>
 | 
			
		||||
        </md-tab>
 | 
			
		||||
        <md-tab ng-if="!vm.grid.detailsConfig.isDetailsEditMode" label="{{ 'attribute.latest-telemetry' | translate }}">
 | 
			
		||||
        <md-tab ng-if="!vm.grid.detailsConfig.isDetailsEditMode && vm.isPluginEditable(vm.grid.operatingItem())" label="{{ 'attribute.latest-telemetry' | translate }}">
 | 
			
		||||
            <tb-attribute-table flex
 | 
			
		||||
                                entity-id="vm.grid.operatingItem().id.id"
 | 
			
		||||
                                entity-type="{{vm.types.entityType.plugin}}"
 | 
			
		||||
@ -48,7 +48,7 @@
 | 
			
		||||
                                disable-attribute-scope-selection="true">
 | 
			
		||||
            </tb-attribute-table>
 | 
			
		||||
        </md-tab>
 | 
			
		||||
        <md-tab ng-if="!vm.grid.detailsConfig.isDetailsEditMode" label="{{ 'plugin.events' | translate }}">
 | 
			
		||||
        <md-tab ng-if="!vm.grid.detailsConfig.isDetailsEditMode && vm.isPluginEditable(vm.grid.operatingItem())" label="{{ 'plugin.events' | translate }}">
 | 
			
		||||
            <tb-event-table flex entity-type="vm.types.entityType.plugin"
 | 
			
		||||
                            entity-id="vm.grid.operatingItem().id.id"
 | 
			
		||||
                            tenant-id="vm.grid.operatingItem().tenantId.id"
 | 
			
		||||
@ -56,7 +56,7 @@
 | 
			
		||||
                            disabled-event-types="{{vm.types.eventType.alarm.value}}">
 | 
			
		||||
            </tb-event-table>
 | 
			
		||||
        </md-tab>
 | 
			
		||||
        <md-tab ng-if="!vm.grid.detailsConfig.isDetailsEditMode" label="{{ 'relation.relations' | translate }}">
 | 
			
		||||
        <md-tab ng-if="!vm.grid.detailsConfig.isDetailsEditMode && vm.isPluginEditable(vm.grid.operatingItem())" label="{{ 'relation.relations' | translate }}">
 | 
			
		||||
            <tb-relation-table flex
 | 
			
		||||
                               entity-id="vm.grid.operatingItem().id.id"
 | 
			
		||||
                               entity-type="{{vm.types.entityType.plugin}}">
 | 
			
		||||
 | 
			
		||||
@ -134,6 +134,8 @@ export default function RuleController(ruleService, userService, importExport, $
 | 
			
		||||
        vm.ruleGridConfig.topIndex = $stateParams.topIndex;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    vm.isRuleEditable = isRuleEditable;
 | 
			
		||||
 | 
			
		||||
    vm.activateRule = activateRule;
 | 
			
		||||
    vm.suspendRule = suspendRule;
 | 
			
		||||
    vm.exportRule = exportRule;
 | 
			
		||||
 | 
			
		||||
@ -19,7 +19,7 @@
 | 
			
		||||
    <details-buttons tb-help="'rules'" help-container-id="help-container">
 | 
			
		||||
        <div id="help-container"></div>
 | 
			
		||||
    </details-buttons>
 | 
			
		||||
    <md-tabs ng-class="{'tb-headless': vm.grid.detailsConfig.isDetailsEditMode}"
 | 
			
		||||
    <md-tabs ng-class="{'tb-headless': (vm.grid.detailsConfig.isDetailsEditMode || !vm.isRuleEditable(vm.grid.operatingItem()))}"
 | 
			
		||||
             id="tabs" md-border-bottom flex class="tb-absolute-fill">
 | 
			
		||||
        <md-tab label="{{ 'rule.details' | translate }}">
 | 
			
		||||
            <tb-rule rule="vm.grid.operatingItem()"
 | 
			
		||||
@ -31,7 +31,7 @@
 | 
			
		||||
                               on-export-rule="vm.exportRule(event, vm.grid.detailsConfig.currentItem)"
 | 
			
		||||
                               on-delete-rule="vm.grid.deleteItem(event, vm.grid.detailsConfig.currentItem)"></tb-rule>
 | 
			
		||||
        </md-tab>
 | 
			
		||||
        <md-tab ng-if="!vm.grid.detailsConfig.isDetailsEditMode" label="{{ 'attribute.attributes' | translate }}">
 | 
			
		||||
        <md-tab ng-if="!vm.grid.detailsConfig.isDetailsEditMode && vm.isRuleEditable(vm.grid.operatingItem())" label="{{ 'attribute.attributes' | translate }}">
 | 
			
		||||
            <tb-attribute-table flex
 | 
			
		||||
                                entity-id="vm.grid.operatingItem().id.id"
 | 
			
		||||
                                entity-type="{{vm.types.entityType.rule}}"
 | 
			
		||||
@ -39,7 +39,7 @@
 | 
			
		||||
                                default-attribute-scope="{{vm.types.attributesScope.server.value}}">
 | 
			
		||||
            </tb-attribute-table>
 | 
			
		||||
        </md-tab>
 | 
			
		||||
        <md-tab ng-if="!vm.grid.detailsConfig.isDetailsEditMode" label="{{ 'attribute.latest-telemetry' | translate }}">
 | 
			
		||||
        <md-tab ng-if="!vm.grid.detailsConfig.isDetailsEditMode && vm.isRuleEditable(vm.grid.operatingItem())" label="{{ 'attribute.latest-telemetry' | translate }}">
 | 
			
		||||
            <tb-attribute-table flex
 | 
			
		||||
                                entity-id="vm.grid.operatingItem().id.id"
 | 
			
		||||
                                entity-type="{{vm.types.entityType.rule}}"
 | 
			
		||||
@ -48,7 +48,7 @@
 | 
			
		||||
                                disable-attribute-scope-selection="true">
 | 
			
		||||
            </tb-attribute-table>
 | 
			
		||||
        </md-tab>
 | 
			
		||||
        <md-tab ng-if="!vm.grid.detailsConfig.isDetailsEditMode" label="{{ 'rule.events' | translate }}">
 | 
			
		||||
        <md-tab ng-if="!vm.grid.detailsConfig.isDetailsEditMode && vm.isRuleEditable(vm.grid.operatingItem())" label="{{ 'rule.events' | translate }}">
 | 
			
		||||
            <tb-event-table flex entity-type="vm.types.entityType.rule"
 | 
			
		||||
                            entity-id="vm.grid.operatingItem().id.id"
 | 
			
		||||
                            tenant-id="vm.grid.operatingItem().tenantId.id"
 | 
			
		||||
@ -56,7 +56,7 @@
 | 
			
		||||
                            disabled-event-types="{{vm.types.eventType.alarm.value}}">
 | 
			
		||||
            </tb-event-table>
 | 
			
		||||
        </md-tab>
 | 
			
		||||
        <md-tab ng-if="!vm.grid.detailsConfig.isDetailsEditMode" label="{{ 'relation.relations' | translate }}">
 | 
			
		||||
        <md-tab ng-if="!vm.grid.detailsConfig.isDetailsEditMode && vm.isRuleEditable(vm.grid.operatingItem())" label="{{ 'relation.relations' | translate }}">
 | 
			
		||||
            <tb-relation-table flex
 | 
			
		||||
                               entity-id="vm.grid.operatingItem().id.id"
 | 
			
		||||
                               entity-type="{{vm.types.entityType.rule}}">
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user