diff --git a/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java b/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java index 7d021f3c1f..3590a10c86 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java @@ -19,7 +19,9 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.PathVariable; @@ -40,10 +42,14 @@ import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UUIDBased; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.model.ModelConstants; +import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.security.permission.Operation; @@ -69,6 +75,9 @@ public class EntityViewController extends BaseController { public static final String ENTITY_VIEW_ID = "entityViewId"; + @Autowired + private TimeseriesService tsService; + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") @RequestMapping(value = "/entityView/{entityViewId}", method = RequestMethod.GET) @ResponseBody @@ -101,16 +110,37 @@ public class EntityViewController extends BaseController { try { entityView.setTenantId(getCurrentUser().getTenantId()); - checkEntity(entityView.getId(), entityView, Resource.ENTITY_VIEW); + List> futures = new ArrayList<>(); + + if (entityView.getId() == null) { + accessControlService + .checkPermission(getCurrentUser(), Resource.ENTITY_VIEW, Operation.CREATE, null, entityView); + } else { + EntityView existingEntityView = checkNotNull(entityViewService.findEntityViewById(getCurrentUser().getTenantId(), entityView.getId())); + if (existingEntityView.getKeys() != null) { + if (existingEntityView.getKeys().getAttributes() != null) { + futures.add(deleteAttributesFromEntityView(existingEntityView, DataConstants.CLIENT_SCOPE, existingEntityView.getKeys().getAttributes().getCs(), getCurrentUser())); + futures.add(deleteAttributesFromEntityView(existingEntityView, DataConstants.SERVER_SCOPE, existingEntityView.getKeys().getAttributes().getCs(), getCurrentUser())); + futures.add(deleteAttributesFromEntityView(existingEntityView, DataConstants.SHARED_SCOPE, existingEntityView.getKeys().getAttributes().getCs(), getCurrentUser())); + } + if (existingEntityView.getKeys().getTimeseries() != null && !existingEntityView.getKeys().getTimeseries().isEmpty()) { + futures.add(deleteLatestFromEntityView(existingEntityView, existingEntityView.getKeys().getTimeseries(), getCurrentUser())); + } + } + } EntityView savedEntityView = checkNotNull(entityViewService.saveEntityView(entityView)); - List>> futures = new ArrayList<>(); - if (savedEntityView.getKeys() != null && savedEntityView.getKeys().getAttributes() != null) { - futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.CLIENT_SCOPE, savedEntityView.getKeys().getAttributes().getCs(), getCurrentUser())); - futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SERVER_SCOPE, savedEntityView.getKeys().getAttributes().getSs(), getCurrentUser())); - futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SHARED_SCOPE, savedEntityView.getKeys().getAttributes().getSh(), getCurrentUser())); + if (savedEntityView.getKeys() != null) { + if (savedEntityView.getKeys().getAttributes() != null) { + futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.CLIENT_SCOPE, savedEntityView.getKeys().getAttributes().getCs(), getCurrentUser())); + futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SERVER_SCOPE, savedEntityView.getKeys().getAttributes().getSs(), getCurrentUser())); + futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SHARED_SCOPE, savedEntityView.getKeys().getAttributes().getSh(), getCurrentUser())); + } + if (savedEntityView.getKeys().getTimeseries() != null && !savedEntityView.getKeys().getTimeseries().isEmpty()) { + futures.add(copyLatestFromEntityToEntityView(savedEntityView, getCurrentUser())); + } } - for (ListenableFuture> future : futures) { + for (ListenableFuture future : futures) { try { future.get(); } catch (InterruptedException | ExecutionException e) { @@ -128,6 +158,98 @@ public class EntityViewController extends BaseController { } } + private ListenableFuture deleteLatestFromEntityView(EntityView entityView, List keys, SecurityUser user) { + EntityViewId entityId = entityView.getId(); + SettableFuture resultFuture = SettableFuture.create(); + if (keys != null && !keys.isEmpty()) { + tsSubService.deleteLatest(entityView.getTenantId(), entityId, keys, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void tmp) { + try { + logTimeseriesDeleted(user, entityId, keys, null); + } catch (ThingsboardException e) { + log.error("Failed to log timeseries delete", e); + } + resultFuture.set(tmp); + } + + @Override + public void onFailure(Throwable t) { + try { + logTimeseriesDeleted(user, entityId, keys, t); + } catch (ThingsboardException e) { + log.error("Failed to log timeseries delete", e); + } + resultFuture.setException(t); + } + }); + } else { + resultFuture.set(null); + } + return resultFuture; + } + + private ListenableFuture deleteAttributesFromEntityView(EntityView entityView, String scope, List keys, SecurityUser user) { + EntityViewId entityId = entityView.getId(); + SettableFuture resultFuture = SettableFuture.create(); + if (keys != null && !keys.isEmpty()) { + tsSubService.deleteAndNotify(entityView.getTenantId(), entityId, scope, keys, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void tmp) { + try { + logAttributesDeleted(user, entityId, scope, keys, null); + } catch (ThingsboardException e) { + log.error("Failed to log attribute delete", e); + } + resultFuture.set(tmp); + } + + @Override + public void onFailure(Throwable t) { + try { + logAttributesDeleted(user, entityId, scope, keys, t); + } catch (ThingsboardException e) { + log.error("Failed to log attribute delete", e); + } + resultFuture.setException(t); + } + }); + } else { + resultFuture.set(null); + } + return resultFuture; + } + + private ListenableFuture> copyLatestFromEntityToEntityView(EntityView entityView, SecurityUser user) { + EntityViewId entityId = entityView.getId(); + List keys = entityView.getKeys().getTimeseries(); + long startTime = entityView.getStartTimeMs(); + long endTime = entityView.getEndTimeMs(); + ListenableFuture> latestFuture; + if (startTime == 0 && endTime == 0) { + latestFuture = tsService.findLatest(user.getTenantId(), entityView.getEntityId(), keys); + } else { + long startTs = startTime; + long endTs = endTime == 0 ? System.currentTimeMillis() : endTime; + List queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList()); + latestFuture = tsService.findAll(user.getTenantId(), entityView.getEntityId(), queries); + } + return Futures.transform(latestFuture, latestValues -> { + if (latestValues != null && !latestValues.isEmpty()) { + tsSubService.saveLatestAndNotify(entityView.getTenantId(), entityId, latestValues, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void tmp) { + } + + @Override + public void onFailure(Throwable t) { + } + }); + } + return null; + }, MoreExecutors.directExecutor()); + } + private ListenableFuture> copyAttributesFromEntityToEntityView(EntityView entityView, String scope, Collection keys, SecurityUser user) throws ThingsboardException { EntityViewId entityId = entityView.getId(); if (keys != null && !keys.isEmpty()) { @@ -174,10 +296,20 @@ public class EntityViewController extends BaseController { } private void logAttributesUpdated(SecurityUser user, EntityId entityId, String scope, List attributes, Throwable e) throws ThingsboardException { - logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.ATTRIBUTES_UPDATED, toException(e), + logEntityAction(user, entityId, null, null, ActionType.ATTRIBUTES_UPDATED, toException(e), scope, attributes); } + private void logAttributesDeleted(SecurityUser user, EntityId entityId, String scope, List keys, Throwable e) throws ThingsboardException { + logEntityAction(user, entityId, null, null, ActionType.ATTRIBUTES_DELETED, toException(e), + scope, keys); + } + + private void logTimeseriesDeleted(SecurityUser user, EntityId entityId, List keys, Throwable e) throws ThingsboardException { + logEntityAction(user, entityId, null, null, ActionType.TIMESERIES_DELETED, toException(e), + keys); + } + @PreAuthorize("hasAuthority('TENANT_ADMIN')") @RequestMapping(value = "/entityView/{entityViewId}", method = RequestMethod.DELETE) @ResponseStatus(value = HttpStatus.OK) diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index 7d80b115d5..73661f40e1 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -171,6 +171,7 @@ public class ThingsboardInstallService { case "3.0.1": log.info("Upgrading ThingsBoard from version 3.0.1 to 3.1.0 ..."); databaseEntitiesUpgradeService.upgradeDatabase("3.0.1"); + dataUpdateService.updateData("3.0.1"); log.info("Updating system data..."); systemDataLoaderService.updateSystemWidgets(); break; diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index 26b087535f..dd61dc0d0f 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -15,20 +15,38 @@ */ package org.thingsboard.server.service.install.update; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.SearchTextBased; import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.id.EntityViewId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UUIDBased; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.rule.RuleChain; +import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.tenant.TenantService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.service.install.InstallScripts; +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + @Service @Profile("install") @Slf4j @@ -43,6 +61,12 @@ public class DefaultDataUpdateService implements DataUpdateService { @Autowired private InstallScripts installScripts; + @Autowired + private EntityViewService entityViewService; + + @Autowired + private TimeseriesService tsService; + @Override public void updateData(String fromVersion) throws Exception { switch (fromVersion) { @@ -50,6 +74,10 @@ public class DefaultDataUpdateService implements DataUpdateService { log.info("Updating data from version 1.4.0 to 2.0.0 ..."); tenantsDefaultRuleChainUpdater.updateEntities(null); break; + case "3.0.1": + log.info("Updating data from version 3.0.1 to 3.1.0 ..."); + tenantsEntityViewsUpdater.updateEntities(null); + break; default: throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); } @@ -76,4 +104,66 @@ public class DefaultDataUpdateService implements DataUpdateService { } }; -} \ No newline at end of file + private PaginatedUpdater tenantsEntityViewsUpdater = + new PaginatedUpdater() { + + @Override + protected PageData findEntities(String region, PageLink pageLink) { + return tenantService.findTenants(pageLink); + } + + @Override + protected void updateEntity(Tenant tenant) { + updateTenantEntityViews(tenant.getId()); + } + }; + + private void updateTenantEntityViews(TenantId tenantId) { + PageLink pageLink = new PageLink(100); + PageData pageData = entityViewService.findEntityViewByTenantId(tenantId, pageLink); + boolean hasNext = true; + while (hasNext) { + List>> updateFutures = new ArrayList<>(); + for (EntityView entityView : pageData.getData()) { + updateFutures.add(updateEntityViewLatestTelemetry(entityView)); + } + + try { + Futures.allAsList(updateFutures).get(); + } catch (InterruptedException | ExecutionException e) { + log.error("Failed to copy latest telemetry to entity view", e); + } + + if (pageData.hasNext()) { + pageLink = pageLink.nextPageLink(); + pageData = entityViewService.findEntityViewByTenantId(tenantId, pageLink); + } else { + hasNext = false; + } + } + } + + private ListenableFuture> updateEntityViewLatestTelemetry(EntityView entityView) { + EntityViewId entityId = entityView.getId(); + List keys = entityView.getKeys().getTimeseries(); + long startTime = entityView.getStartTimeMs(); + long endTime = entityView.getEndTimeMs(); + ListenableFuture> latestFuture; + if (startTime == 0 && endTime == 0) { + latestFuture = tsService.findLatest(TenantId.SYS_TENANT_ID, entityView.getEntityId(), keys); + } else { + long startTs = startTime; + long endTs = endTime == 0 ? System.currentTimeMillis() : endTime; + List queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList()); + latestFuture = tsService.findAll(TenantId.SYS_TENANT_ID, entityView.getEntityId(), queries); + } + return Futures.transformAsync(latestFuture, latestValues -> { + if (latestValues != null && !latestValues.isEmpty()) { + ListenableFuture> saveFuture = tsService.saveLatest(TenantId.SYS_TENANT_ID, entityId, latestValues); + return saveFuture; + } + return null; + }, MoreExecutors.directExecutor()); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index d40ce8bfa9..fcaf04293f 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -115,6 +115,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes)); } + @Override + public void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List ts, FutureCallback callback) { + ListenableFuture> saveFuture = tsService.saveLatest(tenantId, entityId, ts); + addMainCallback(saveFuture, callback); + addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts)); + } + @Override public void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List keys, FutureCallback callback) { ListenableFuture> deleteFuture = attrService.removeAll(tenantId, entityId, scope, keys); @@ -122,6 +129,12 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer addWsCallback(deleteFuture, success -> onAttributesDelete(tenantId, entityId, scope, keys)); } + @Override + public void deleteLatest(TenantId tenantId, EntityId entityId, List keys, FutureCallback callback) { + ListenableFuture> deleteFuture = tsService.removeLatest(tenantId, entityId, keys); + addMainCallback(deleteFuture, callback); + } + @Override public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback callback) { saveAndNotify(tenantId, entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value) diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index 9b3a578f58..a490562ac9 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -40,5 +40,9 @@ public interface TimeseriesService { ListenableFuture> save(TenantId tenantId, EntityId entityId, List tsKvEntry, long ttl); + ListenableFuture> saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntry); + ListenableFuture> remove(TenantId tenantId, EntityId entityId, List queries); + + ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index b78393b0a4..3c21ace038 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; @@ -149,6 +150,18 @@ public class BaseTimeseriesService implements TimeseriesService { return Futures.allAsList(futures); } + @Override + public ListenableFuture> saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntries) { + List> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size()); + for (TsKvEntry tsKvEntry : tsKvEntries) { + if (tsKvEntry == null) { + throw new IncorrectParameterException("Key value entry can't be null"); + } + futures.add(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry)); + } + return Futures.allAsList(futures); + } + private void saveAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only"); @@ -188,6 +201,17 @@ public class BaseTimeseriesService implements TimeseriesService { return Futures.allAsList(futures); } + @Override + public ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys) { + validate(entityId); + List> futures = Lists.newArrayListWithExpectedSize(keys.size()); + for (String key : keys) { + DeleteTsKvQuery query = new BaseDeleteTsKvQuery(key, 0, System.currentTimeMillis(), false); + futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); + } + return Futures.allAsList(futures); + } + private void deleteAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, DeleteTsKvQuery query) { futures.add(timeseriesDao.remove(tenantId, entityId, query)); futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index 742dc5b507..8cb410bf19 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -36,6 +36,8 @@ public interface RuleEngineTelemetryService { void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List attributes, FutureCallback callback); + void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List ts, FutureCallback callback); + void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback callback); void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, String value, FutureCallback callback); @@ -46,5 +48,7 @@ public interface RuleEngineTelemetryService { void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List keys, FutureCallback callback); + void deleteLatest(TenantId tenantId, EntityId entityId, List keys, FutureCallback callback); + }