diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index 951ee760ca..51a5e4c96c 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -51,6 +51,7 @@ import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetService; +import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.audit.AuditLogService; import org.thingsboard.server.dao.customer.CustomerService; import org.thingsboard.server.dao.dashboard.DashboardService; @@ -70,6 +71,7 @@ import org.thingsboard.server.exception.ThingsboardErrorResponseHandler; import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.state.DeviceStateService; +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import javax.mail.MessagingException; import javax.servlet.http.HttpServletRequest; @@ -143,6 +145,12 @@ public abstract class BaseController { @Autowired protected EntityViewService entityViewService; + @Autowired + protected TelemetrySubscriptionService tsSubService; + + @Autowired + protected AttributesService attributesService; + @ExceptionHandler(ThingsboardException.class) public void handleThingsboardException(ThingsboardException ex, HttpServletResponse response) { errorResponseHandler.handle(ex, response); diff --git a/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java b/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java index a1ba1c63f8..9b7ffd2d11 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java @@ -15,7 +15,10 @@ */ package org.thingsboard.server.controller; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.PathVariable; @@ -26,7 +29,9 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; +import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.common.data.Customer; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; @@ -34,15 +39,24 @@ import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.entityview.EntityViewSearchQuery; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.UUIDBased; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.page.TextPageData; import org.thingsboard.server.common.data.page.TextPageLink; +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.service.security.model.SecurityUser; +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static org.thingsboard.server.controller.CustomerController.CUSTOMER_ID; @@ -52,6 +66,7 @@ import static org.thingsboard.server.controller.CustomerController.CUSTOMER_ID; */ @RestController @RequestMapping("/api") +@Slf4j public class EntityViewController extends BaseController { public static final String ENTITY_VIEW_ID = "entityViewId"; @@ -75,6 +90,20 @@ public class EntityViewController extends BaseController { try { entityView.setTenantId(getCurrentUser().getTenantId()); EntityView savedEntityView = checkNotNull(entityViewService.saveEntityView(entityView)); + List>> futures = new ArrayList<>(); + if (savedEntityView.getKeys() != null && savedEntityView.getKeys().getAttributes() != null) { + futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.CLIENT_SCOPE, savedEntityView.getKeys().getAttributes().getCs(), getCurrentUser())); + futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SERVER_SCOPE, savedEntityView.getKeys().getAttributes().getSs(), getCurrentUser())); + futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SHARED_SCOPE, savedEntityView.getKeys().getAttributes().getSh(), getCurrentUser())); + } + for (ListenableFuture> future : futures) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Failed to copy attributes to entity view", e); + } + } + logEntityAction(savedEntityView.getId(), savedEntityView, null, entityView.getId() == null ? ActionType.ADDED : ActionType.UPDATED, null); return savedEntityView; @@ -85,6 +114,56 @@ public class EntityViewController extends BaseController { } } + private ListenableFuture> copyAttributesFromEntityToEntityView(EntityView entityView, String scope, Collection keys, SecurityUser user) throws ThingsboardException { + EntityViewId entityId = entityView.getId(); + if (keys != null && !keys.isEmpty()) { + ListenableFuture> getAttrFuture = attributesService.find(entityView.getEntityId(), scope, keys); + return Futures.transform(getAttrFuture, attributeKvEntries -> { + List attributes; + if (attributeKvEntries != null && !attributeKvEntries.isEmpty()) { + attributes = + attributeKvEntries.stream() + .filter(attributeKvEntry -> { + long startTime = entityView.getStartTimeMs(); + long endTime = entityView.getEndTimeMs(); + long lastUpdateTs = attributeKvEntry.getLastUpdateTs(); + return startTime == 0 && endTime == 0 || + (endTime == 0 && startTime < lastUpdateTs) || + (startTime == 0 && endTime > lastUpdateTs) + ? true : startTime < lastUpdateTs && endTime > lastUpdateTs; + }).collect(Collectors.toList()); + tsSubService.saveAndNotify(entityId, scope, attributes, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void tmp) { + try { + logAttributesUpdated(user, entityId, scope, attributes, null); + } catch (ThingsboardException e) { + log.error("Failed to log attribute updates", e); + } + } + + @Override + public void onFailure(Throwable t) { + try { + logAttributesUpdated(user, entityId, scope, attributes, t); + } catch (ThingsboardException e) { + log.error("Failed to log attribute updates", e); + } + } + }); + } + return null; + }); + } else { + return Futures.immediateFuture(null); + } + } + + private void logAttributesUpdated(SecurityUser user, EntityId entityId, String scope, List attributes, Throwable e) throws ThingsboardException { + logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.ATTRIBUTES_UPDATED, toException(e), + scope, attributes); + } + @PreAuthorize("hasAuthority('TENANT_ADMIN')") @RequestMapping(value = "/entityView/{entityViewId}", method = RequestMethod.DELETE) @ResponseStatus(value = HttpStatus.OK) diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index 7062ca97f9..9f92019550 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -93,15 +93,9 @@ import java.util.stream.Collectors; @Slf4j public class TelemetryController extends BaseController { - @Autowired - private AttributesService attributesService; - @Autowired private TimeseriesService tsService; - @Autowired - private TelemetrySubscriptionService tsSubService; - @Autowired private AccessValidator accessValidator; diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index bcca43ca5e..3b0deea78a 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -143,7 +143,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) { long startTime = 0L; long endTime = 0L; - if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { + if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW) && TelemetryFeature.TIMESERIES.equals(sub.getType())) { EntityView entityView = entityViewService.findEntityViewById(new EntityViewId(entityId.getId())); entityId = entityView.getEntityId(); startTime = entityView.getStartTimeMs(); @@ -165,38 +165,15 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio } private SubscriptionState getUpdatedSubscriptionState(EntityId entityId, SubscriptionState sub, EntityView entityView) { - boolean allKeys; Map keyStates; - if (sub.getType().equals(TelemetryFeature.TIMESERIES) && !entityView.getKeys().getTimeseries().isEmpty()) { - allKeys = false; + if(sub.isAllKeys()) { + keyStates = entityView.getKeys().getTimeseries().stream().collect(Collectors.toMap(k -> k, k -> 0L)); + } else { keyStates = sub.getKeyStates().entrySet() .stream().filter(entry -> entityView.getKeys().getTimeseries().contains(entry.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } else if (sub.getType().equals(TelemetryFeature.ATTRIBUTES)) { - if (sub.getScope().equals(DataConstants.CLIENT_SCOPE) && !entityView.getKeys().getAttributes().getCs().isEmpty()) { - allKeys = false; - keyStates = filterMap(sub, entityView.getKeys().getAttributes().getCs()); - } else if (sub.getScope().equals(DataConstants.SERVER_SCOPE) && !entityView.getKeys().getAttributes().getSs().isEmpty()) { - allKeys = false; - keyStates = filterMap(sub, entityView.getKeys().getAttributes().getSs()); - } else if (sub.getScope().equals(DataConstants.SERVER_SCOPE) && !entityView.getKeys().getAttributes().getSh().isEmpty()) { - allKeys = false; - keyStates = filterMap(sub, entityView.getKeys().getAttributes().getSh()); - } else { - allKeys = sub.isAllKeys(); - keyStates = sub.getKeyStates(); - } - } else { - allKeys = sub.isAllKeys(); - keyStates = sub.getKeyStates(); } - return new SubscriptionState(sub.getWsSessionId(), sub.getSubscriptionId(), entityId, sub.getType(), allKeys, keyStates, sub.getScope()); - } - - private Map filterMap(SubscriptionState sub, List allowedKeys) { - return sub.getKeyStates().entrySet() - .stream().filter(entry -> allowedKeys.contains(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + return new SubscriptionState(sub.getWsSessionId(), sub.getSubscriptionId(), entityId, sub.getType(), false, keyStates, sub.getScope()); } @Override @@ -467,7 +444,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio onLocalSubUpdate(entityId, s -> TelemetryFeature.ATTRIBUTES == s.getType() && (StringUtils.isEmpty(s.getScope()) || scope.equals(s.getScope())), s -> { List subscriptionUpdate = null; for (AttributeKvEntry kv : attributes) { - if (isInTimeRange(s, kv.getLastUpdateTs()) && (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey()))) { + if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) { if (subscriptionUpdate == null) { subscriptionUpdate = new ArrayList<>(); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java b/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java index 1b67ca0486..33a26997ca 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java @@ -29,6 +29,7 @@ import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.customer.CustomerService; import org.thingsboard.server.dao.dashboard.DashboardService; import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.user.UserService; @@ -46,6 +47,9 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe @Autowired private DeviceService deviceService; + @Autowired + private EntityViewService entityViewService; + @Autowired private TenantService tenantService; @@ -81,6 +85,9 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe case DEVICE: hasName = deviceService.findDeviceByIdAsync(new DeviceId(entityId.getId())); break; + case ENTITY_VIEW: + hasName = entityViewService.findEntityViewByIdAsync(new EntityViewId(entityId.getId())); + break; case TENANT: hasName = tenantService.findTenantByIdAsync(new TenantId(entityId.getId())); break; diff --git a/dao/src/main/java/org/thingsboard/server/dao/entityview/CassandraEntityViewDao.java b/dao/src/main/java/org/thingsboard/server/dao/entityview/CassandraEntityViewDao.java index fe1a8d1f8d..6ddc18d463 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/entityview/CassandraEntityViewDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/entityview/CassandraEntityViewDao.java @@ -84,8 +84,7 @@ public class CassandraEntityViewDao extends CassandraAbstractSearchTextDao>> futures = new ArrayList<>(); - if (savedEntityView.getKeys() != null) { - futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.CLIENT_SCOPE, savedEntityView.getKeys().getAttributes().getCs())); - futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SERVER_SCOPE, savedEntityView.getKeys().getAttributes().getSs())); - futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SHARED_SCOPE, savedEntityView.getKeys().getAttributes().getSh())); - } - for (ListenableFuture> future : futures) { - try { - future.get(); - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to copy attributes to entity view", e); - throw new RuntimeException("Failed to copy attributes to entity view", e); - } - } return savedEntityView; } @@ -294,36 +279,6 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti }); } - private ListenableFuture> copyAttributesFromEntityToEntityView(EntityView entityView, String scope, Collection keys) { - if (keys != null && !keys.isEmpty()) { - ListenableFuture> getAttrFuture = attributesService.find(entityView.getEntityId(), scope, keys); - return Futures.transform(getAttrFuture, attributeKvEntries -> { - List filteredAttributes = new ArrayList<>(); - if (attributeKvEntries != null && !attributeKvEntries.isEmpty()) { - filteredAttributes = - attributeKvEntries.stream() - .filter(attributeKvEntry -> { - long startTime = entityView.getStartTimeMs(); - long endTime = entityView.getEndTimeMs(); - long lastUpdateTs = attributeKvEntry.getLastUpdateTs(); - return startTime == 0 && endTime == 0 || - (endTime == 0 && startTime < lastUpdateTs) || - (startTime == 0 && endTime > lastUpdateTs) - ? true : startTime < lastUpdateTs && endTime > lastUpdateTs; - }).collect(Collectors.toList()); - } - try { - return attributesService.save(entityView.getId(), scope, filteredAttributes).get(); - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to copy attributes to entity view", e); - throw new RuntimeException("Failed to copy attributes to entity view", e); - } - }); - } else { - return Futures.immediateFuture(null); - } - } - private DataValidator entityViewValidator = new DataValidator() { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 4a09bb7a23..4658cbe490 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -79,12 +79,16 @@ public class BaseTimeseriesService implements TimeseriesService { if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { EntityView entityView = entityViewService.findEntityViewById((EntityViewId) entityId); List filteredKeys = new ArrayList<>(keys); - if (!entityView.getKeys().getTimeseries().isEmpty()) { + if (entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null && + !entityView.getKeys().getTimeseries().isEmpty()) { filteredKeys.retainAll(entityView.getKeys().getTimeseries()); } List queries = filteredKeys.stream() - .map(key -> new BaseReadTsKvQuery(key, entityView.getStartTimeMs(), entityView.getEndTimeMs(), 1, "ASC")) + .map(key -> { + long endTs = entityView.getEndTimeMs() != 0 ? entityView.getEndTimeMs() : Long.MAX_VALUE; + return new BaseReadTsKvQuery(key, entityView.getStartTimeMs(), endTs, 1, "DESC"); + }) .collect(Collectors.toList()); if (queries.size() > 0) { @@ -100,7 +104,17 @@ public class BaseTimeseriesService implements TimeseriesService { @Override public ListenableFuture> findAllLatest(EntityId entityId) { validate(entityId); - return timeseriesDao.findAllLatest(entityId); + if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { + EntityView entityView = entityViewService.findEntityViewById((EntityViewId) entityId); + if (entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null && + !entityView.getKeys().getTimeseries().isEmpty()) { + return findLatest(entityId, entityView.getKeys().getTimeseries()); + } else { + return Futures.immediateFuture(new ArrayList<>()); + } + } else { + return timeseriesDao.findAllLatest(entityId); + } } @Override diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/AbstractServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/AbstractServiceTest.java index 1e1f15daff..f49c357caa 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/AbstractServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/AbstractServiceTest.java @@ -45,6 +45,7 @@ import org.thingsboard.server.dao.customer.CustomerService; import org.thingsboard.server.dao.dashboard.DashboardService; import org.thingsboard.server.dao.device.DeviceCredentialsService; import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.event.EventService; import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.rule.RuleChainService; @@ -88,6 +89,9 @@ public abstract class AbstractServiceTest { @Autowired protected AssetService assetService; + @Autowired + protected EntityViewService entityViewService; + @Autowired protected DeviceCredentialsService deviceCredentialsService; diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index 4528d9afb1..88f4d84eb6 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -17,9 +17,15 @@ package org.thingsboard.server.dao.service.timeseries; import com.datastax.driver.core.utils.UUIDs; import lombok.extern.slf4j.Slf4j; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.thingsboard.server.common.data.EntityView; +import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; @@ -30,6 +36,7 @@ import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.objects.TelemetryEntityView; import org.thingsboard.server.dao.service.AbstractServiceTest; import java.util.ArrayList; @@ -61,6 +68,22 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { KvEntry doubleKvEntry = new DoubleDataEntry(DOUBLE_KEY, Double.MAX_VALUE); KvEntry booleanKvEntry = new BooleanDataEntry(BOOLEAN_KEY, Boolean.TRUE); + private TenantId tenantId; + + @Before + public void before() { + Tenant tenant = new Tenant(); + tenant.setTitle("My tenant"); + Tenant savedTenant = tenantService.saveTenant(tenant); + Assert.assertNotNull(savedTenant); + tenantId = savedTenant.getId(); + } + + @After + public void after() { + tenantService.deleteTenant(tenantId); + } + @Test public void testFindAllLatest() throws Exception { DeviceId deviceId = new DeviceId(UUIDs.timeBased()); @@ -69,7 +92,15 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { saveEntries(deviceId, TS - 1); saveEntries(deviceId, TS); - List tsList = tsService.findAllLatest(deviceId).get(); + testLatestTsAndVerify(deviceId); + + EntityView entityView = saveAndCreateEntityView(deviceId, Arrays.asList(STRING_KEY, DOUBLE_KEY, LONG_KEY, BOOLEAN_KEY)); + + testLatestTsAndVerify(entityView.getId()); + } + + private void testLatestTsAndVerify(EntityId entityId) throws ExecutionException, InterruptedException { + List tsList = tsService.findAllLatest(entityId).get(); assertNotNull(tsList); assertEquals(4, tsList.size()); @@ -89,6 +120,18 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(expected, tsList); } + private EntityView saveAndCreateEntityView(DeviceId deviceId, List timeseries) { + EntityView entityView = new EntityView(); + entityView.setName("entity_view_name"); + entityView.setType("default"); + entityView.setTenantId(tenantId); + TelemetryEntityView keys = new TelemetryEntityView(); + keys.setTimeseries(timeseries); + entityView.setKeys(keys); + entityView.setEntityId(deviceId); + return entityViewService.saveEntityView(entityView); + } + @Test public void testFindLatest() throws Exception { DeviceId deviceId = new DeviceId(UUIDs.timeBased()); @@ -100,6 +143,12 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { List entries = tsService.findLatest(deviceId, Collections.singleton(STRING_KEY)).get(); Assert.assertEquals(1, entries.size()); Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0)); + + EntityView entityView = saveAndCreateEntityView(deviceId, Arrays.asList(STRING_KEY)); + + entries = tsService.findLatest(entityView.getId(), Collections.singleton(STRING_KEY)).get(); + Assert.assertEquals(1, entries.size()); + Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0)); } @Test diff --git a/msa/docker/.env b/msa/docker/.env index 111ff1e2b4..018d4ac7b7 100644 --- a/msa/docker/.env +++ b/msa/docker/.env @@ -7,4 +7,4 @@ WEB_UI_DOCKER_NAME=tb-web-ui TB_VERSION=2.2.0-SNAPSHOT -KAFKA_TOPICS=js.eval.requests:100:1 +KAFKA_TOPICS=js.eval.requests:100:1:delete --config=retention.ms=60000 --config=retention.bytes=1073741824 diff --git a/msa/docker/docker-compose.yml b/msa/docker/docker-compose.yml index 93c2ab08cd..91e6286d69 100644 --- a/msa/docker/docker-compose.yml +++ b/msa/docker/docker-compose.yml @@ -36,6 +36,9 @@ services: KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE KAFKA_CREATE_TOPICS: "${KAFKA_TOPICS}" KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + KAFKA_LOG_RETENTION_BYTES: 1073741824 + KAFKA_LOG_RETENTION_MS: 300000 + KAFKA_LOG_CLEANUP_POLICY: delete depends_on: - zookeeper tb-js-executor: diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java index 51386dc942..fc0ff28749 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java @@ -17,7 +17,9 @@ package org.thingsboard.rule.engine.action; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; +import com.google.gson.JsonElement; import com.google.gson.JsonParser; +import com.google.gson.JsonPrimitive; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; import org.thingsboard.rule.engine.api.RuleNode; @@ -25,7 +27,6 @@ import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.rule.engine.api.TbRelationTypes; import org.thingsboard.rule.engine.api.util.DonAsynchron; import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.DataConstants; @@ -37,12 +38,11 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE; import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; @Slf4j @@ -68,70 +68,89 @@ public class TbCopyAttributesToEntityViewNode implements TbNode { } @Override - public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { - if (!msg.getMetaData().getData().isEmpty()) { - long now = System.currentTimeMillis(); - String scope = msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) ? - DataConstants.CLIENT_SCOPE : msg.getMetaData().getValue("scope"); + public void onMsg(TbContext ctx, TbMsg msg) { + if (DataConstants.ATTRIBUTES_UPDATED.equals(msg.getType()) || + DataConstants.ATTRIBUTES_DELETED.equals(msg.getType()) || + SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msg.getType())) { + if (!msg.getMetaData().getData().isEmpty()) { + long now = System.currentTimeMillis(); + String scope = msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) ? + DataConstants.CLIENT_SCOPE : msg.getMetaData().getValue("scope"); - ListenableFuture> entityViewsFuture = - ctx.getEntityViewService().findEntityViewsByTenantIdAndEntityIdAsync(ctx.getTenantId(), msg.getOriginator()); + ListenableFuture> entityViewsFuture = + ctx.getEntityViewService().findEntityViewsByTenantIdAndEntityIdAsync(ctx.getTenantId(), msg.getOriginator()); - DonAsynchron.withCallback(entityViewsFuture, - entityViews -> { - for (EntityView entityView : entityViews) { - long startTime = entityView.getStartTimeMs(); - long endTime = entityView.getEndTimeMs(); - if ((endTime != 0 && endTime > now && startTime < now) || (endTime == 0 && startTime < now)) { - Set attributes = - JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData())).getAttributes(); - List filteredAttributes = - attributes.stream() - .filter(attr -> { - switch (scope) { - case DataConstants.CLIENT_SCOPE: - if (entityView.getKeys().getAttributes().getCs().isEmpty()) { - return true; - } - return entityView.getKeys().getAttributes().getCs().contains(attr.getKey()); - case DataConstants.SERVER_SCOPE: - if (entityView.getKeys().getAttributes().getSs().isEmpty()) { - return true; - } - return entityView.getKeys().getAttributes().getSs().contains(attr.getKey()); - case DataConstants.SHARED_SCOPE: - if (entityView.getKeys().getAttributes().getSh().isEmpty()) { - return true; - } - return entityView.getKeys().getAttributes().getSh().contains(attr.getKey()); + DonAsynchron.withCallback(entityViewsFuture, + entityViews -> { + for (EntityView entityView : entityViews) { + long startTime = entityView.getStartTimeMs(); + long endTime = entityView.getEndTimeMs(); + if ((endTime != 0 && endTime > now && startTime < now) || (endTime == 0 && startTime < now)) { + if (DataConstants.ATTRIBUTES_UPDATED.equals(msg.getType()) || + SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msg.getType())) { + Set attributes = + JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData())).getAttributes(); + List filteredAttributes = + attributes.stream().filter(attr -> attributeContainsInEntityView(scope, attr.getKey(), entityView)).collect(Collectors.toList()); + ctx.getTelemetryService().saveAndNotify(entityView.getId(), scope, filteredAttributes, + new FutureCallback() { + @Override + public void onSuccess(@Nullable Void result) { + transformAndTellNext(ctx, msg, entityView); } - return false; - }).collect(Collectors.toList()); - ctx.getTelemetryService().saveAndNotify(entityView.getId(), scope, filteredAttributes, - new FutureCallback() { - @Override - public void onSuccess(@Nullable Void result) { - TbMsg updMsg = ctx.transformMsg(msg, msg.getType(), entityView.getId(), msg.getMetaData(), msg.getData()); - ctx.tellNext(updMsg, SUCCESS); + @Override + public void onFailure(Throwable t) { + ctx.tellFailure(msg, t); + } + }); + } else if (DataConstants.ATTRIBUTES_DELETED.equals(msg.getType())) { + List attributes = new ArrayList<>(); + for (JsonElement element : new JsonParser().parse(msg.getData()).getAsJsonObject().get("attributes").getAsJsonArray()) { + if (element.isJsonPrimitive()) { + JsonPrimitive value = element.getAsJsonPrimitive(); + if (value.isString()) { + attributes.add(value.getAsString()); + } } - - @Override - public void onFailure(Throwable t) { - ctx.tellFailure(msg, t); - } - }); + } + List filteredAttributes = + attributes.stream().filter(attr -> attributeContainsInEntityView(scope, attr, entityView)).collect(Collectors.toList()); + if (filteredAttributes != null && !filteredAttributes.isEmpty()) { + ctx.getAttributesService().removeAll(entityView.getId(), scope, filteredAttributes); + transformAndTellNext(ctx, msg, entityView); + } + } + } } - } - }, - t -> ctx.tellFailure(msg, t)); + }, + t -> ctx.tellFailure(msg, t)); + } else { + ctx.tellFailure(msg, new IllegalArgumentException("Message metadata is empty")); + } } else { - ctx.tellNext(msg, FAILURE); + ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type [" + msg.getType() + "]")); } } + private void transformAndTellNext(TbContext ctx, TbMsg msg, EntityView entityView) { + TbMsg updMsg = ctx.transformMsg(msg, msg.getType(), entityView.getId(), msg.getMetaData(), msg.getData()); + ctx.tellNext(updMsg, SUCCESS); + } + + private boolean attributeContainsInEntityView(String scope, String attrKey, EntityView entityView) { + switch (scope) { + case DataConstants.CLIENT_SCOPE: + return entityView.getKeys().getAttributes().getCs().contains(attrKey); + case DataConstants.SERVER_SCOPE: + return entityView.getKeys().getAttributes().getSs().contains(attrKey); + case DataConstants.SHARED_SCOPE: + return entityView.getKeys().getAttributes().getSh().contains(attrKey); + } + return false; + } + @Override public void destroy() { - } } diff --git a/ui/src/app/api/entity-view.service.js b/ui/src/app/api/entity-view.service.js index e34d3a4aee..23fefbc05b 100644 --- a/ui/src/app/api/entity-view.service.js +++ b/ui/src/app/api/entity-view.service.js @@ -27,7 +27,6 @@ function EntityViewService($http, $q, $window, userService, attributeService, cu deleteEntityView: deleteEntityView, getCustomerEntityViews: getCustomerEntityViews, getEntityView: getEntityView, - getEntityViews: getEntityViews, getTenantEntityViews: getTenantEntityViews, saveEntityView: saveEntityView, unassignEntityViewFromCustomer: unassignEntityViewFromCustomer, @@ -126,32 +125,6 @@ function EntityViewService($http, $q, $window, userService, attributeService, cu return deferred.promise; } - function getEntityViews(entityViewIds, config) { - var deferred = $q.defer(); - var ids = ''; - for (var i=0;i0) { - ids += ','; - } - ids += entityViewIds[i]; - } - var url = '/api/entityViews?entityViewIds=' + ids; - $http.get(url, config).then(function success(response) { - var entityViews = response.data; - entityViews.sort(function (entityView1, entityView2) { - var id1 = entityView1.id.id; - var id2 = entityView2.id.id; - var index1 = entityViewIds.indexOf(id1); - var index2 = entityViewIds.indexOf(id2); - return index1 - index2; - }); - deferred.resolve(entityViews); - }, function fail(response) { - deferred.reject(response.data); - }); - return deferred.promise; - } - function saveEntityView(entityView) { var deferred = $q.defer(); var url = '/api/entityView'; diff --git a/ui/src/app/api/entity.service.js b/ui/src/app/api/entity.service.js index 205abc9e4e..00c7ecf990 100644 --- a/ui/src/app/api/entity.service.js +++ b/ui/src/app/api/entity.service.js @@ -135,6 +135,10 @@ function EntityService($http, $q, $filter, $translate, $log, userService, device case types.entityType.asset: promise = assetService.getAssets(entityIds, config); break; + case types.entityType.entityView: + promise = getEntitiesByIdsPromise( + (id) => entityViewService.getEntityView(id, config), entityIds); + break; case types.entityType.tenant: promise = getEntitiesByIdsPromise( (id) => tenantService.getTenant(id, config), entityIds); diff --git a/ui/src/app/entity-view/add-entity-view.tpl.html b/ui/src/app/entity-view/add-entity-view.tpl.html index 48a1788ed0..ebf1f1d341 100644 --- a/ui/src/app/entity-view/add-entity-view.tpl.html +++ b/ui/src/app/entity-view/add-entity-view.tpl.html @@ -15,7 +15,7 @@ limitations under the License. --> - +
diff --git a/ui/src/app/entity-view/entity-view-fieldset.tpl.html b/ui/src/app/entity-view/entity-view-fieldset.tpl.html index 66dc116cc8..140c199688 100644 --- a/ui/src/app/entity-view/entity-view-fieldset.tpl.html +++ b/ui/src/app/entity-view/entity-view-fieldset.tpl.html @@ -60,7 +60,7 @@ entity-type="types.entityType.entityView">
- +
+ + + +
{{ 'entity-view.attributes-propagation' | translate }}
+ + +
+ + +
{{ 'entity-view.attributes-propagation' | translate }}
+ + +
+ +
entity-view.attributes-propagation-hint
+ + + + {{item}} + + + + + + {{item}} + + + + + + {{item}} + + +
+
+
+ + +
{{ 'entity-view.timeseries-data' | translate }}
+ + +
+ + +
{{ 'entity-view.timeseries-data' | translate }}
+ + +
+ +
entity-view.timeseries-data-hint
+ + + + {{item}} + + +
+
+
+
+
+ + +
+
+ + +
-
- - - - - - - - - - - - -
-
-
- - -
-
- - -
-
diff --git a/ui/src/app/entity-view/entity-view.directive.js b/ui/src/app/entity-view/entity-view.directive.js index e1ae82fa6a..25377f4cb4 100644 --- a/ui/src/app/entity-view/entity-view.directive.js +++ b/ui/src/app/entity-view/entity-view.directive.js @@ -13,6 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +import './entity-view.scss'; + /* eslint-disable import/no-unresolved, import/default */ import entityViewFieldsetTemplate from './entity-view-fieldset.tpl.html'; @@ -20,12 +23,16 @@ import entityViewFieldsetTemplate from './entity-view-fieldset.tpl.html'; /* eslint-enable import/no-unresolved, import/default */ /*@ngInject*/ -export default function EntityViewDirective($compile, $templateCache, $filter, toast, $translate, $mdConstant, - types, clipboardService, entityViewService, customerService) { +export default function EntityViewDirective($q, $compile, $templateCache, $filter, toast, $translate, $mdConstant, $mdExpansionPanel, + types, clipboardService, entityViewService, customerService, entityService) { var linker = function (scope, element) { var template = $templateCache.get(entityViewFieldsetTemplate); element.html(template); + scope.attributesPanelId = (Math.random()*1000).toFixed(0); + scope.timeseriesPanelId = (Math.random()*1000).toFixed(0); + scope.$mdExpansionPanel = $mdExpansionPanel; + scope.types = types; scope.isAssignedToCustomer = false; scope.isPublic = false; @@ -53,9 +60,13 @@ export default function EntityViewDirective($compile, $templateCache, $filter, t } if (scope.entityView.startTimeMs > 0) { scope.startTimeMs = new Date(scope.entityView.startTimeMs); + } else { + scope.startTimeMs = null; } if (scope.entityView.endTimeMs > 0) { scope.endTimeMs = new Date(scope.entityView.endTimeMs); + } else { + scope.endTimeMs = null; } if (!scope.entityView.keys) { scope.entityView.keys = {}; @@ -68,6 +79,19 @@ export default function EntityViewDirective($compile, $templateCache, $filter, t } }); + scope.dataKeysSearch = function (searchText, type) { + var deferred = $q.defer(); + entityService.getEntityKeys(scope.entityView.entityId.entityType, scope.entityView.entityId.id, searchText, type, {ignoreLoading: true}).then( + function success(keys) { + deferred.resolve(keys); + }, + function fail() { + deferred.resolve([]); + } + ); + return deferred.promise; + + }; scope.$watch('startTimeMs', function (newDate) { if (newDate) { diff --git a/ui/src/app/entity-view/entity-view.scss b/ui/src/app/entity-view/entity-view.scss new file mode 100644 index 0000000000..7b16b4a85d --- /dev/null +++ b/ui/src/app/entity-view/entity-view.scss @@ -0,0 +1,47 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@import "../../scss/constants"; + +.tb-entity-view-panel-group { + .tb-panel-title { + min-width: 90px; + user-select: none; + + @media (min-width: $layout-breakpoint-sm) { + min-width: 180px; + } + } + + .tb-panel-prompt { + overflow: hidden; + font-size: 14px; + color: rgba(0, 0, 0, .87); + text-overflow: ellipsis; + white-space: nowrap; + } + + &.disabled { + .tb-panel-title, + .tb-panel-prompt { + color: rgba(0, 0, 0, .38); + } + } + + md-icon.md-expansion-panel-icon { + margin-right: 0; + } +} diff --git a/ui/src/app/locale/locale.constant-en_US.json b/ui/src/app/locale/locale.constant-en_US.json index 55a17223ac..1fde4b6d47 100644 --- a/ui/src/app/locale/locale.constant-en_US.json +++ b/ui/src/app/locale/locale.constant-en_US.json @@ -838,14 +838,24 @@ "unable-entity-view-device-alias-text": "Device alias '{{entityViewAlias}}' can't be deleted as it used by the following widget(s):
{{widgetsList}}", "select-entity-view": "Select entity view", "make-public": "Make entity view public", + "start-date": "Start date", "start-ts": "Start time", + "end-date": "End date", "end-ts": "End time", "date-limits": "Date limits", "client-attributes": "Client attributes", "shared-attributes": "Shared attributes", "server-attributes": "Server attributes", - "latest-timeseries": "Latest timeseries", - "related-entity": "Related entity" + "timeseries": "Timeseries", + "client-attributes-placeholder": "Client attributes", + "shared-attributes-placeholder": "Shared attributes", + "server-attributes-placeholder": "Server attributes", + "timeseries-placeholder": "Timeseries", + "target-entity": "Target entity", + "attributes-propagation": "Attributes propagation", + "attributes-propagation-hint": "Entity View will automatically copy specified attributes from Target Entity each time you save or update this entity view. For performance reasons target entity attributes are not propagated to entity view on each attribute change. You can enable automatic propagation by configuring \"copy to view\" rule node in your rule chain and linking \"Post attributes\" and \"Attributes Updated\" messages to the new rule node.", + "timeseries-data": "Timeseries data", + "timeseries-data-hint": "Configure timeseries data keys of the target entity that will be accessible to the entity view. This timeseries data is read-only." }, "event": { "event-type": "Event type", diff --git a/ui/src/app/locale/locale.constant-es_ES.json b/ui/src/app/locale/locale.constant-es_ES.json index ec0e2272d2..81cc52ed6f 100644 --- a/ui/src/app/locale/locale.constant-es_ES.json +++ b/ui/src/app/locale/locale.constant-es_ES.json @@ -839,7 +839,7 @@ "client-attributes": "Client attributes", "shared-attributes": "Shared attributes", "server-attributes": "Server attributes", - "latest-timeseries": "Latest timeseries" + "timeseries": "Timeseries" }, "event": { "event-type": "Tipo de evento",