Merge pull request #1198 from dmytro-landiak/feature/delete-timeseries
delete timeseries fixes and controller, entityView findByName added
This commit is contained in:
		
						commit
						b42b99daaf
					
				@ -29,7 +29,6 @@ import org.springframework.web.bind.annotation.RequestParam;
 | 
			
		||||
import org.springframework.web.bind.annotation.ResponseBody;
 | 
			
		||||
import org.springframework.web.bind.annotation.ResponseStatus;
 | 
			
		||||
import org.springframework.web.bind.annotation.RestController;
 | 
			
		||||
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
 | 
			
		||||
import org.thingsboard.server.common.data.Customer;
 | 
			
		||||
import org.thingsboard.server.common.data.DataConstants;
 | 
			
		||||
import org.thingsboard.server.common.data.EntitySubtype;
 | 
			
		||||
@ -39,7 +38,6 @@ import org.thingsboard.server.common.data.audit.ActionType;
 | 
			
		||||
import org.thingsboard.server.common.data.entityview.EntityViewSearchQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CustomerId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityViewId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
@ -47,7 +45,6 @@ import org.thingsboard.server.common.data.id.UUIDBased;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.page.TextPageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.TextPageLink;
 | 
			
		||||
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 | 
			
		||||
import org.thingsboard.server.dao.exception.IncorrectParameterException;
 | 
			
		||||
import org.thingsboard.server.dao.model.ModelConstants;
 | 
			
		||||
import org.thingsboard.server.service.security.model.SecurityUser;
 | 
			
		||||
@ -174,7 +171,7 @@ public class EntityViewController extends BaseController {
 | 
			
		||||
            EntityView entityView = checkEntityViewId(entityViewId);
 | 
			
		||||
            entityViewService.deleteEntityView(entityViewId);
 | 
			
		||||
            logEntityAction(entityViewId, entityView, entityView.getCustomerId(),
 | 
			
		||||
                    ActionType.DELETED,null, strEntityViewId);
 | 
			
		||||
                    ActionType.DELETED, null, strEntityViewId);
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            logEntityAction(emptyId(EntityType.ENTITY_VIEW),
 | 
			
		||||
                    null,
 | 
			
		||||
@ -184,11 +181,24 @@ public class EntityViewController extends BaseController {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreAuthorize("hasAuthority('TENANT_ADMIN')")
 | 
			
		||||
    @RequestMapping(value = "/tenant/entityViews", params = {"entityViewName"}, method = RequestMethod.GET)
 | 
			
		||||
    @ResponseBody
 | 
			
		||||
    public EntityView getTenantEntityView(
 | 
			
		||||
            @RequestParam String entityViewName) throws ThingsboardException {
 | 
			
		||||
        try {
 | 
			
		||||
            TenantId tenantId = getCurrentUser().getTenantId();
 | 
			
		||||
            return checkNotNull(entityViewService.findEntityViewByTenantIdAndName(tenantId, entityViewName));
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            throw handleException(e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreAuthorize("hasAuthority('TENANT_ADMIN')")
 | 
			
		||||
    @RequestMapping(value = "/customer/{customerId}/entityView/{entityViewId}", method = RequestMethod.POST)
 | 
			
		||||
    @ResponseBody
 | 
			
		||||
    public EntityView assignEntityViewToCustomer(@PathVariable(CUSTOMER_ID) String strCustomerId,
 | 
			
		||||
                                             @PathVariable(ENTITY_VIEW_ID) String strEntityViewId) throws ThingsboardException {
 | 
			
		||||
                                                 @PathVariable(ENTITY_VIEW_ID) String strEntityViewId) throws ThingsboardException {
 | 
			
		||||
        checkParameter(CUSTOMER_ID, strCustomerId);
 | 
			
		||||
        checkParameter(ENTITY_VIEW_ID, strEntityViewId);
 | 
			
		||||
        try {
 | 
			
		||||
 | 
			
		||||
@ -49,9 +49,11 @@ import org.thingsboard.server.common.data.kv.Aggregation;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.AttributeKey;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.KvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
 | 
			
		||||
@ -60,12 +62,10 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 | 
			
		||||
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
 | 
			
		||||
import org.thingsboard.server.dao.attributes.AttributesService;
 | 
			
		||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
 | 
			
		||||
import org.thingsboard.server.service.security.AccessValidator;
 | 
			
		||||
import org.thingsboard.server.service.security.model.SecurityUser;
 | 
			
		||||
import org.thingsboard.server.service.telemetry.AttributeData;
 | 
			
		||||
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
 | 
			
		||||
import org.thingsboard.server.service.telemetry.TsData;
 | 
			
		||||
import org.thingsboard.server.service.telemetry.exception.InvalidParametersException;
 | 
			
		||||
import org.thingsboard.server.service.telemetry.exception.UncheckedApiException;
 | 
			
		||||
@ -249,6 +249,60 @@ public class TelemetryController extends BaseController {
 | 
			
		||||
        return saveTelemetry(entityId, requestBody, ttl);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
 | 
			
		||||
    @RequestMapping(value = "/{entityType}/{entityId}/timeseries/delete", method = RequestMethod.DELETE)
 | 
			
		||||
    @ResponseBody
 | 
			
		||||
    public DeferredResult<ResponseEntity> deleteEntityTimeseries(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
 | 
			
		||||
                                                                 @RequestParam(name = "keys") String keysStr,
 | 
			
		||||
                                                                 @RequestParam(name = "deleteAllDataForKeys", defaultValue = "false") boolean deleteAllDataForKeys,
 | 
			
		||||
                                                                 @RequestParam(name = "startTs", required = false) Long startTs,
 | 
			
		||||
                                                                 @RequestParam(name = "endTs", required = false) Long endTs,
 | 
			
		||||
                                                                 @RequestParam(name = "rewriteLatestIfDeleted", defaultValue = "false") boolean rewriteLatestIfDeleted) throws ThingsboardException {
 | 
			
		||||
        EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
 | 
			
		||||
        return deleteTimeseries(entityId, keysStr, deleteAllDataForKeys, startTs, endTs, rewriteLatestIfDeleted);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private DeferredResult<ResponseEntity> deleteTimeseries(EntityId entityIdStr, String keysStr, boolean deleteAllDataForKeys,
 | 
			
		||||
                                                            Long startTs, Long endTs, boolean rewriteLatestIfDeleted) throws ThingsboardException {
 | 
			
		||||
        List<String> keys = toKeysList(keysStr);
 | 
			
		||||
        if (keys.isEmpty()) {
 | 
			
		||||
            return getImmediateDeferredResult("Empty keys: " + keysStr, HttpStatus.BAD_REQUEST);
 | 
			
		||||
        }
 | 
			
		||||
        SecurityUser user = getCurrentUser();
 | 
			
		||||
 | 
			
		||||
        long deleteFromTs;
 | 
			
		||||
        long deleteToTs;
 | 
			
		||||
        if (deleteAllDataForKeys) {
 | 
			
		||||
            deleteFromTs = 0L;
 | 
			
		||||
            deleteToTs = System.currentTimeMillis();
 | 
			
		||||
        } else {
 | 
			
		||||
            deleteFromTs = startTs;
 | 
			
		||||
            deleteToTs = endTs;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return accessValidator.validateEntityAndCallback(user, entityIdStr, (result, entityId) -> {
 | 
			
		||||
            List<DeleteTsKvQuery> deleteTsKvQueries = new ArrayList<>();
 | 
			
		||||
            for (String key : keys) {
 | 
			
		||||
                deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted));
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            ListenableFuture<List<Void>> future = tsService.remove(entityId, deleteTsKvQueries);
 | 
			
		||||
            Futures.addCallback(future, new FutureCallback<List<Void>>() {
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onSuccess(@Nullable List<Void> tmp) {
 | 
			
		||||
                    logTimeseriesDeleted(user, entityId, keys, null);
 | 
			
		||||
                    result.setResult(new ResponseEntity<>(HttpStatus.OK));
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onFailure(Throwable t) {
 | 
			
		||||
                    logTimeseriesDeleted(user, entityId, keys, t);
 | 
			
		||||
                    result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
 | 
			
		||||
                }
 | 
			
		||||
            }, executor);
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
 | 
			
		||||
    @RequestMapping(value = "/{deviceId}/{scope}", method = RequestMethod.DELETE)
 | 
			
		||||
    @ResponseBody
 | 
			
		||||
@ -506,6 +560,15 @@ public class TelemetryController extends BaseController {
 | 
			
		||||
        };
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void logTimeseriesDeleted(SecurityUser user, EntityId entityId, List<String> keys, Throwable e) {
 | 
			
		||||
        try {
 | 
			
		||||
            logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.TIMESERIES_DELETED, toException(e),
 | 
			
		||||
                    keys);
 | 
			
		||||
        } catch (ThingsboardException te) {
 | 
			
		||||
            log.warn("Failed to log timeseries delete", te);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void logAttributesDeleted(SecurityUser user, EntityId entityId, String scope, List<String> keys, Throwable e) {
 | 
			
		||||
        try {
 | 
			
		||||
            logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.ATTRIBUTES_DELETED, toException(e),
 | 
			
		||||
 | 
			
		||||
@ -24,6 +24,7 @@ public enum ActionType {
 | 
			
		||||
    UPDATED(false), // log entity
 | 
			
		||||
    ATTRIBUTES_UPDATED(false), // log attributes/values
 | 
			
		||||
    ATTRIBUTES_DELETED(false), // log attributes
 | 
			
		||||
    TIMESERIES_DELETED(false), // log timeseries
 | 
			
		||||
    RPC_CALL(false), // log method and params
 | 
			
		||||
    CREDENTIALS_UPDATED(false), // log new credentials
 | 
			
		||||
    ASSIGNED_TO_CUSTOMER(false), // log customer name
 | 
			
		||||
@ -32,11 +33,11 @@ public enum ActionType {
 | 
			
		||||
    SUSPENDED(false), // log string id
 | 
			
		||||
    CREDENTIALS_READ(true), // log device id
 | 
			
		||||
    ATTRIBUTES_READ(true), // log attributes
 | 
			
		||||
    RELATION_ADD_OR_UPDATE (false),
 | 
			
		||||
    RELATION_DELETED (false),
 | 
			
		||||
    RELATIONS_DELETED (false),
 | 
			
		||||
    ALARM_ACK (false),
 | 
			
		||||
    ALARM_CLEAR (false);
 | 
			
		||||
    RELATION_ADD_OR_UPDATE(false),
 | 
			
		||||
    RELATION_DELETED(false),
 | 
			
		||||
    RELATIONS_DELETED(false),
 | 
			
		||||
    ALARM_ACK(false),
 | 
			
		||||
    ALARM_CLEAR(false);
 | 
			
		||||
 | 
			
		||||
    private final boolean isRead;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -43,6 +43,8 @@ public interface EntityViewService {
 | 
			
		||||
 | 
			
		||||
    EntityView findEntityViewById(EntityViewId entityViewId);
 | 
			
		||||
 | 
			
		||||
    EntityView findEntityViewByTenantIdAndName(TenantId tenantId, String name);
 | 
			
		||||
 | 
			
		||||
    TextPageData<EntityView> findEntityViewByTenantId(TenantId tenantId, TextPageLink pageLink);
 | 
			
		||||
 | 
			
		||||
    TextPageData<EntityView> findEntityViewByTenantIdAndType(TenantId tenantId, TextPageLink pageLink, String type);
 | 
			
		||||
 | 
			
		||||
@ -29,8 +29,6 @@ import org.springframework.cache.annotation.Cacheable;
 | 
			
		||||
import org.springframework.cache.annotation.Caching;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.common.data.Customer;
 | 
			
		||||
import org.thingsboard.server.common.data.DataConstants;
 | 
			
		||||
import org.thingsboard.server.common.data.Device;
 | 
			
		||||
import org.thingsboard.server.common.data.EntitySubtype;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityView;
 | 
			
		||||
@ -40,12 +38,10 @@ import org.thingsboard.server.common.data.id.CustomerId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityViewId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.page.TextPageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.TextPageLink;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.EntityRelation;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
 | 
			
		||||
import org.thingsboard.server.dao.attributes.AttributesService;
 | 
			
		||||
import org.thingsboard.server.dao.customer.CustomerDao;
 | 
			
		||||
import org.thingsboard.server.dao.entity.AbstractEntityService;
 | 
			
		||||
import org.thingsboard.server.dao.exception.DataValidationException;
 | 
			
		||||
@ -56,15 +52,13 @@ import org.thingsboard.server.dao.tenant.TenantDao;
 | 
			
		||||
import javax.annotation.Nullable;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Arrays;
 | 
			
		||||
import java.util.Collection;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.Comparator;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.server.common.data.CacheConstants.ENTITY_VIEW_CACHE;
 | 
			
		||||
import static org.thingsboard.server.common.data.CacheConstants.RELATIONS_CACHE;
 | 
			
		||||
import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
 | 
			
		||||
import static org.thingsboard.server.dao.service.Validator.validateId;
 | 
			
		||||
import static org.thingsboard.server.dao.service.Validator.validatePageLink;
 | 
			
		||||
@ -96,6 +90,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
 | 
			
		||||
 | 
			
		||||
    @Caching(evict = {
 | 
			
		||||
            @CacheEvict(cacheNames = ENTITY_VIEW_CACHE, key = "{#entityView.tenantId, #entityView.entityId}"),
 | 
			
		||||
            @CacheEvict(cacheNames = ENTITY_VIEW_CACHE, key = "{#entityView.tenantId, #entityView.name}"),
 | 
			
		||||
            @CacheEvict(cacheNames = ENTITY_VIEW_CACHE, key = "{#entityView.id}")})
 | 
			
		||||
    @Override
 | 
			
		||||
    public EntityView saveEntityView(EntityView entityView) {
 | 
			
		||||
@ -137,6 +132,15 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
 | 
			
		||||
        return entityViewDao.findById(entityViewId.getId());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Cacheable(cacheNames = ENTITY_VIEW_CACHE, key = "{#tenantId, #name}")
 | 
			
		||||
    @Override
 | 
			
		||||
    public EntityView findEntityViewByTenantIdAndName(TenantId tenantId, String name) {
 | 
			
		||||
        log.trace("Executing findEntityViewByTenantIdAndName [{}][{}]", tenantId, name);
 | 
			
		||||
        validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
 | 
			
		||||
        Optional<EntityView> entityViewOpt = entityViewDao.findEntityViewByTenantIdAndName(tenantId.getId(), name);
 | 
			
		||||
        return entityViewOpt.orElse(null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TextPageData<EntityView> findEntityViewByTenantId(TenantId tenantId, TextPageLink pageLink) {
 | 
			
		||||
        log.trace("Executing findEntityViewsByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink);
 | 
			
		||||
@ -255,6 +259,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
 | 
			
		||||
        deleteEntityRelations(entityViewId);
 | 
			
		||||
        EntityView entityView = entityViewDao.findById(entityViewId.getId());
 | 
			
		||||
        cacheManager.getCache(ENTITY_VIEW_CACHE).evict(Arrays.asList(entityView.getTenantId(), entityView.getEntityId()));
 | 
			
		||||
        cacheManager.getCache(ENTITY_VIEW_CACHE).evict(Arrays.asList(entityView.getTenantId(), entityView.getName()));
 | 
			
		||||
        entityViewDao.removeById(entityViewId.getId());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql.timeseries;
 | 
			
		||||
 | 
			
		||||
import com.google.common.base.Function;
 | 
			
		||||
import com.google.common.collect.Lists;
 | 
			
		||||
import com.google.common.util.concurrent.FutureCallback;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.ListeningExecutorService;
 | 
			
		||||
@ -31,6 +32,7 @@ import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.data.UUIDConverter;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.Aggregation;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
 | 
			
		||||
@ -41,9 +43,9 @@ import org.thingsboard.server.dao.model.sql.TsKvEntity;
 | 
			
		||||
import org.thingsboard.server.dao.model.sql.TsKvLatestCompositeKey;
 | 
			
		||||
import org.thingsboard.server.dao.model.sql.TsKvLatestEntity;
 | 
			
		||||
import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
 | 
			
		||||
import org.thingsboard.server.dao.timeseries.SimpleListenableFuture;
 | 
			
		||||
import org.thingsboard.server.dao.timeseries.TimeseriesDao;
 | 
			
		||||
import org.thingsboard.server.dao.timeseries.TsInsertExecutorType;
 | 
			
		||||
import org.thingsboard.server.dao.util.SqlDao;
 | 
			
		||||
import org.thingsboard.server.dao.util.SqlTsDao;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.Nullable;
 | 
			
		||||
@ -53,6 +55,7 @@ import java.util.ArrayList;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.concurrent.CompletableFuture;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
@ -64,6 +67,8 @@ import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID;
 | 
			
		||||
@SqlTsDao
 | 
			
		||||
public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService implements TimeseriesDao {
 | 
			
		||||
 | 
			
		||||
    private static final String DESC_ORDER = "DESC";
 | 
			
		||||
 | 
			
		||||
    @Value("${sql.ts_inserts_executor_type}")
 | 
			
		||||
    private String insertExecutorType;
 | 
			
		||||
 | 
			
		||||
@ -326,14 +331,72 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query) {
 | 
			
		||||
        TsKvLatestEntity latestEntity = new TsKvLatestEntity();
 | 
			
		||||
        latestEntity.setEntityType(entityId.getEntityType());
 | 
			
		||||
        latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
 | 
			
		||||
        latestEntity.setKey(query.getKey());
 | 
			
		||||
        return service.submit(() -> {
 | 
			
		||||
            tsKvLatestRepository.delete(latestEntity);
 | 
			
		||||
            return null;
 | 
			
		||||
        ListenableFuture<TsKvEntry> latestFuture = findLatest(entityId, query.getKey());
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<Boolean> booleanFuture = Futures.transform(latestFuture, tsKvEntry -> {
 | 
			
		||||
            long ts = tsKvEntry.getTs();
 | 
			
		||||
            return ts > query.getStartTs() && ts <= query.getEndTs();
 | 
			
		||||
        }, service);
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<Void> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
 | 
			
		||||
            if (isRemove) {
 | 
			
		||||
                TsKvLatestEntity latestEntity = new TsKvLatestEntity();
 | 
			
		||||
                latestEntity.setEntityType(entityId.getEntityType());
 | 
			
		||||
                latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
 | 
			
		||||
                latestEntity.setKey(query.getKey());
 | 
			
		||||
                return service.submit(() -> {
 | 
			
		||||
                    tsKvLatestRepository.delete(latestEntity);
 | 
			
		||||
                    return null;
 | 
			
		||||
                });
 | 
			
		||||
            }
 | 
			
		||||
            return Futures.immediateFuture(null);
 | 
			
		||||
        }, service);
 | 
			
		||||
 | 
			
		||||
        final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
 | 
			
		||||
        Futures.addCallback(removedLatestFuture, new FutureCallback<Void>() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(@Nullable Void result) {
 | 
			
		||||
                if (query.getRewriteLatestIfDeleted()) {
 | 
			
		||||
                    ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
 | 
			
		||||
                        if (isRemove) {
 | 
			
		||||
                            return getNewLatestEntryFuture(entityId, query);
 | 
			
		||||
                        }
 | 
			
		||||
                        return Futures.immediateFuture(null);
 | 
			
		||||
                    }, service);
 | 
			
		||||
 | 
			
		||||
                    try {
 | 
			
		||||
                        resultFuture.set(savedLatestFuture.get());
 | 
			
		||||
                    } catch (InterruptedException | ExecutionException e) {
 | 
			
		||||
                        log.warn("Could not get latest saved value for [{}], {}", entityId, query.getKey(), e);
 | 
			
		||||
                    }
 | 
			
		||||
                } else {
 | 
			
		||||
                    resultFuture.set(null);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onFailure(Throwable t) {
 | 
			
		||||
                log.warn("[{}] Failed to process remove of the latest value", entityId, t);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
        return resultFuture;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) {
 | 
			
		||||
        long startTs = 0;
 | 
			
		||||
        long endTs = query.getStartTs() - 1;
 | 
			
		||||
        ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
 | 
			
		||||
                Aggregation.NONE, DESC_ORDER);
 | 
			
		||||
        ListenableFuture<List<TsKvEntry>> future = findAllAsync(entityId, findNewLatestQuery);
 | 
			
		||||
 | 
			
		||||
        return Futures.transformAsync(future, entryList -> {
 | 
			
		||||
            if (entryList.size() == 1) {
 | 
			
		||||
                return saveLatest(entityId, entryList.get(0));
 | 
			
		||||
            } else {
 | 
			
		||||
                log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
 | 
			
		||||
            }
 | 
			
		||||
            return Futures.immediateFuture(null);
 | 
			
		||||
        }, service);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -47,7 +47,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
 | 
			
		||||
    @Modifying
 | 
			
		||||
    @Query("DELETE FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
 | 
			
		||||
            "AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
 | 
			
		||||
            "AND tskv.ts > :startTs AND tskv.ts < :endTs")
 | 
			
		||||
            "AND tskv.ts > :startTs AND tskv.ts <= :endTs")
 | 
			
		||||
    void delete(@Param("entityId") String entityId,
 | 
			
		||||
                @Param("entityType") EntityType entityType,
 | 
			
		||||
                @Param("entityKey") String key,
 | 
			
		||||
 | 
			
		||||
@ -48,7 +48,6 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
 | 
			
		||||
import org.thingsboard.server.dao.model.ModelConstants;
 | 
			
		||||
import org.thingsboard.server.dao.nosql.CassandraAbstractAsyncDao;
 | 
			
		||||
import org.thingsboard.server.dao.util.NoSqlDao;
 | 
			
		||||
import org.thingsboard.server.dao.util.NoSqlTsDao;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.Nullable;
 | 
			
		||||
@ -62,6 +61,7 @@ import java.util.Arrays;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 | 
			
		||||
@ -434,14 +434,14 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 | 
			
		||||
    public ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query) {
 | 
			
		||||
        ListenableFuture<TsKvEntry> latestEntryFuture = findLatest(entityId, query.getKey());
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<Boolean> booleanFuture = Futures.transformAsync(latestEntryFuture, latestEntry -> {
 | 
			
		||||
        ListenableFuture<Boolean> booleanFuture = Futures.transform(latestEntryFuture, latestEntry -> {
 | 
			
		||||
            long ts = latestEntry.getTs();
 | 
			
		||||
            if (ts >= query.getStartTs() && ts <= query.getEndTs()) {
 | 
			
		||||
                return Futures.immediateFuture(true);
 | 
			
		||||
            if (ts > query.getStartTs() && ts <= query.getEndTs()) {
 | 
			
		||||
                return true;
 | 
			
		||||
            } else {
 | 
			
		||||
                log.trace("Won't be deleted latest value for [{}], key - {}", entityId, query.getKey());
 | 
			
		||||
            }
 | 
			
		||||
            return Futures.immediateFuture(false);
 | 
			
		||||
            return false;
 | 
			
		||||
        }, readResultsProcessingExecutor);
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<Void> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
 | 
			
		||||
@ -451,18 +451,34 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 | 
			
		||||
            return Futures.immediateFuture(null);
 | 
			
		||||
        }, readResultsProcessingExecutor);
 | 
			
		||||
 | 
			
		||||
        if (query.getRewriteLatestIfDeleted()) {
 | 
			
		||||
            ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
 | 
			
		||||
                if (isRemove) {
 | 
			
		||||
                    return getNewLatestEntryFuture(entityId, query);
 | 
			
		||||
                }
 | 
			
		||||
                return Futures.immediateFuture(null);
 | 
			
		||||
            }, readResultsProcessingExecutor);
 | 
			
		||||
        final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
 | 
			
		||||
        Futures.addCallback(removedLatestFuture, new FutureCallback<Void>() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(@Nullable Void result) {
 | 
			
		||||
                if (query.getRewriteLatestIfDeleted()) {
 | 
			
		||||
                    ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
 | 
			
		||||
                        if (isRemove) {
 | 
			
		||||
                            return getNewLatestEntryFuture(entityId, query);
 | 
			
		||||
                        }
 | 
			
		||||
                        return Futures.immediateFuture(null);
 | 
			
		||||
                    }, readResultsProcessingExecutor);
 | 
			
		||||
 | 
			
		||||
            return Futures.transformAsync(Futures.allAsList(Arrays.asList(savedLatestFuture, removedLatestFuture)),
 | 
			
		||||
                    list -> Futures.immediateFuture(null), readResultsProcessingExecutor);
 | 
			
		||||
        }
 | 
			
		||||
        return removedLatestFuture;
 | 
			
		||||
                    try {
 | 
			
		||||
                        resultFuture.set(savedLatestFuture.get());
 | 
			
		||||
                    } catch (InterruptedException | ExecutionException e) {
 | 
			
		||||
                        log.warn("Could not get latest saved value for [{}], {}", entityId, query.getKey(), e);
 | 
			
		||||
                    }
 | 
			
		||||
                } else {
 | 
			
		||||
                    resultFuture.set(null);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onFailure(Throwable t) {
 | 
			
		||||
                log.warn("[{}] Failed to process remove of the latest value", entityId, t);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
        return resultFuture;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) {
 | 
			
		||||
 | 
			
		||||
@ -152,7 +152,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testDeleteDeviceTsData() throws Exception {
 | 
			
		||||
    public void testDeleteDeviceTsDataWithoutOverwritingLatest() throws Exception {
 | 
			
		||||
        DeviceId deviceId = new DeviceId(UUIDs.timeBased());
 | 
			
		||||
 | 
			
		||||
        saveEntries(deviceId, 10000);
 | 
			
		||||
@ -171,6 +171,26 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
 | 
			
		||||
        Assert.assertEquals(null, latest.get(0).getValueAsString());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testDeleteDeviceTsDataWithOverwritingLatest() throws Exception {
 | 
			
		||||
        DeviceId deviceId = new DeviceId(UUIDs.timeBased());
 | 
			
		||||
 | 
			
		||||
        saveEntries(deviceId, 10000);
 | 
			
		||||
        saveEntries(deviceId, 20000);
 | 
			
		||||
        saveEntries(deviceId, 30000);
 | 
			
		||||
        saveEntries(deviceId, 40000);
 | 
			
		||||
 | 
			
		||||
        tsService.remove(deviceId, Collections.singletonList(
 | 
			
		||||
                new BaseDeleteTsKvQuery(STRING_KEY, 25000, 45000, true))).get();
 | 
			
		||||
 | 
			
		||||
        List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(
 | 
			
		||||
                new BaseReadTsKvQuery(STRING_KEY, 5000, 45000, 10000, 10, Aggregation.NONE))).get();
 | 
			
		||||
        Assert.assertEquals(2, list.size());
 | 
			
		||||
 | 
			
		||||
        List<TsKvEntry> latest = tsService.findLatest(deviceId, Collections.singletonList(STRING_KEY)).get();
 | 
			
		||||
        Assert.assertEquals(20000, latest.get(0).getTs());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testFindDeviceTsData() throws Exception {
 | 
			
		||||
        DeviceId deviceId = new DeviceId(UUIDs.timeBased());
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user